diff --git a/mcp-server/src/legal_mcp/services/chunker.py b/mcp-server/src/legal_mcp/services/chunker.py index 8b6f8a6..be570f5 100644 --- a/mcp-server/src/legal_mcp/services/chunker.py +++ b/mcp-server/src/legal_mcp/services/chunker.py @@ -33,8 +33,15 @@ def chunk_document( text: str, chunk_size: int = config.CHUNK_SIZE_TOKENS, overlap: int = config.CHUNK_OVERLAP_TOKENS, + page_offsets: list[int] | None = None, ) -> list[Chunk]: - """Split a legal document into chunks, respecting section boundaries.""" + """Split a legal document into chunks, respecting section boundaries. + + When ``page_offsets`` is supplied (from a PDF extraction), each chunk + is tagged with the page number of its first character — used by the + multimodal hybrid retriever to join (text chunk, image at same page) + and surface text+image matches. + """ if not text.strip(): return [] @@ -52,9 +59,34 @@ def chunk_document( )) idx += 1 + if page_offsets: + _assign_pages(chunks, text, page_offsets) return chunks +def _assign_pages(chunks: list[Chunk], text: str, page_offsets: list[int]) -> None: + """Locate each chunk's first character in ``text`` and tag with the + page that contains that offset. Mutates chunks in-place. + + Chunks have overlap so we search forward from a position slightly + past the previous chunk's start. Falls back to a global search if + the forward scan misses (rare — happens only when overlap is bigger + than the advance distance below). + """ + from legal_mcp.services.extractor import page_at_offset + pos = 0 + for c in chunks: + idx = text.find(c.content, pos) + if idx < 0: + idx = text.find(c.content) + if idx < 0: + continue + c.page_number = page_at_offset(idx, page_offsets) + # advance past the chunk's halfway point — overlap is < 50% so + # the next chunk's starting point will be after this cursor. + pos = idx + max(1, len(c.content) // 2) + + def _split_into_sections(text: str) -> list[tuple[str, str]]: """Split text into (section_type, text) pairs based on Hebrew headers.""" # Find all section headers and their positions diff --git a/mcp-server/src/legal_mcp/services/extractor.py b/mcp-server/src/legal_mcp/services/extractor.py index 9d08100..4309537 100644 --- a/mcp-server/src/legal_mcp/services/extractor.py +++ b/mcp-server/src/legal_mcp/services/extractor.py @@ -120,12 +120,22 @@ def _fix_hebrew_quotes(text: str) -> str: # ── Extraction ─────────────────────────────────────────────────── -async def extract_text(file_path: str) -> tuple[str, int]: +# Separator used when joining per-page text. Constant so chunker / +# retrofit can reproduce the join when computing page offsets. +PAGE_SEPARATOR = "\n\n" + + +async def extract_text(file_path: str) -> tuple[str, int, list[int] | None]: """Extract text from a document file. Returns: - Tuple of (extracted_text, page_count). - page_count is 0 for non-PDF files. + ``(text, page_count, page_offsets)`` where: + - ``text``: concatenated extracted text + - ``page_count``: number of pages (0 for non-PDF) + - ``page_offsets``: ``page_offsets[i]`` = char start offset of + page (i+1) inside ``text``. ``None`` for non-PDFs (where the + notion of pages doesn't apply). Used by the chunker to assign + a ``page_number`` to each chunk. """ path = Path(file_path) suffix = path.suffix.lower() @@ -133,18 +143,34 @@ async def extract_text(file_path: str) -> tuple[str, int]: if suffix == ".pdf": return await _extract_pdf(path) elif suffix == ".docx": - return _extract_docx(path), 0 + return _extract_docx(path), 0, None elif suffix == ".doc": - return _extract_doc(path), 0 + return _extract_doc(path), 0, None elif suffix == ".rtf": - return _extract_rtf(path), 0 + return _extract_rtf(path), 0, None elif suffix in (".txt", ".md"): - return path.read_text(encoding="utf-8"), 0 + return path.read_text(encoding="utf-8"), 0, None else: raise ValueError(f"Unsupported file type: {suffix}") -async def _extract_pdf(path: Path) -> tuple[str, int]: +def _join_pages(pages_text: list[str]) -> tuple[str, list[int]]: + """Join per-page text with PAGE_SEPARATOR while recording the start + offset of each page in the joined output.""" + offsets: list[int] = [] + parts: list[str] = [] + cursor = 0 + for i, pg in enumerate(pages_text): + offsets.append(cursor) + parts.append(pg) + cursor += len(pg) + if i < len(pages_text) - 1: + parts.append(PAGE_SEPARATOR) + cursor += len(PAGE_SEPARATOR) + return "".join(parts), offsets + + +async def _extract_pdf(path: Path) -> tuple[str, int, list[int]]: """Extract text from PDF. Try direct text first, fall back to Google Cloud Vision for scanned @@ -172,7 +198,27 @@ async def _extract_pdf(path: Path) -> tuple[str, int]: pages_text.append(ocr_text) doc.close() - return "\n\n".join(pages_text), page_count + joined, offsets = _join_pages(pages_text) + return joined, page_count, offsets + + +def page_at_offset(offset: int, page_offsets: list[int]) -> int: + """Look up the page number containing a given char offset. + + page_offsets[i] is the start of page (i+1) in the joined text; + a chunk starting at ``offset`` belongs to the highest-indexed page + whose start is ``<= offset``. Returns 1-based page number. + """ + if not page_offsets: + return 1 + # Linear scan is fine — page_offsets is short (≤ ~200 for our PDFs). + page = 1 + for i, start in enumerate(page_offsets): + if start <= offset: + page = i + 1 + else: + break + return page def _ocr_with_google_vision(image_bytes: bytes, page_num: int) -> str: diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 89e6af3..81c6bae 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -127,7 +127,7 @@ async def ingest_precedent( await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: - text, page_count = await extractor.extract_text(str(staged)) + text, page_count, page_offsets = await extractor.extract_text(str(staged)) except Exception as e: await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") raise @@ -161,7 +161,7 @@ async def ingest_precedent( try: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") - chunks = chunker.chunk_document(text) + chunks = chunker.chunk_document(text, page_offsets=page_offsets) if not chunks: await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "completed") diff --git a/mcp-server/src/legal_mcp/services/processor.py b/mcp-server/src/legal_mcp/services/processor.py index 86228d9..c1aec5c 100644 --- a/mcp-server/src/legal_mcp/services/processor.py +++ b/mcp-server/src/legal_mcp/services/processor.py @@ -32,7 +32,7 @@ async def process_document(document_id: UUID, case_id: UUID) -> dict: try: # Step 1: Extract text logger.info("Extracting text from %s", doc["file_path"]) - text, page_count = await extractor.extract_text(doc["file_path"]) + text, page_count, page_offsets = await extractor.extract_text(doc["file_path"]) await db.update_document( document_id, @@ -70,9 +70,9 @@ async def process_document(document_id: UUID, case_id: UUID) -> dict: except Exception as e: logger.warning("Classification failed (non-fatal): %s", e) - # Step 2: Chunk + # Step 2: Chunk (page_offsets propagates page_number into chunks) logger.info("Chunking document (%d chars)", len(text)) - chunks = chunker.chunk_document(text) + chunks = chunker.chunk_document(text, page_offsets=page_offsets) if not chunks: await db.update_document(document_id, extraction_status="completed") diff --git a/mcp-server/src/legal_mcp/tools/documents.py b/mcp-server/src/legal_mcp/tools/documents.py index 67e9dbc..802ac6f 100644 --- a/mcp-server/src/legal_mcp/tools/documents.py +++ b/mcp-server/src/legal_mcp/tools/documents.py @@ -144,7 +144,7 @@ async def document_upload_training( shutil.copy2(str(source), str(dest)) # Extract text and strip Nevo preamble - text, page_count = await extractor.extract_text(str(dest)) + text, page_count, _ = await extractor.extract_text(str(dest)) text = extractor.strip_nevo_preamble(text) # Parse date diff --git a/mcp-server/src/legal_mcp/tools/workflow.py b/mcp-server/src/legal_mcp/tools/workflow.py index 00735ad..d2c7e6d 100644 --- a/mcp-server/src/legal_mcp/tools/workflow.py +++ b/mcp-server/src/legal_mcp/tools/workflow.py @@ -308,7 +308,7 @@ async def ingest_final_version( # Extract text from file if provided if file_path and not final_text: from legal_mcp.services import extractor - final_text, _ = await extractor.extract_text(file_path) + final_text, _, _ = await extractor.extract_text(file_path) if not final_text: return "לא סופק טקסט — יש לספק file_path או final_text." diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 6297a81..859c261 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -23,6 +23,7 @@ | `voyage_rerank_judge_poc.py` | python | POC #4 — voyage-3 vs rerank-2 vs context-3 על אהרון ברק, 18 שאילתות, claude-haiku-4-5 כ-judge. הכרעה: rerank-2 ניצח עם +9% mean@3 | בנצ'מרק חד-פעמי | | `voyage_rerank_corpus_poc.py` | python | POC #5 — voyage-3 vs rerank-2 על קורפוס מלא (785 docs). הכרעה: +4.5% mean@3 כללי, +11.6% על P queries (practical) | בנצ'מרק חד-פעמי, אישר את שלב B | | `multimodal_backfill.py` | python | Backfill voyage-multimodal-3 page embeddings על מסמכי תיקים קיימים. idempotent (skips by default), forces `MULTIMODAL_ENABLED=true` ל-run, רץ מהקונטיינר. שלב C — ראה `docs/voyage-upgrades-plan.md` | ידני per-case (`python multimodal_backfill.py 8174-24 8137-24`) | +| `backfill_chunk_pages.py` | python | Backfill `page_number` ב-`document_chunks` קיימים. legacy chunker לא tracked עמודים → `page_number=NULL` חוסם boost של multimodal hybrid (text+image join על אותו עמוד). re-extracts כל PDF (re-OCR אם צריך, ~$0.0015/page), מחשב page_offsets, ומעדכן chunks. idempotent | ידני per-case (`python backfill_chunk_pages.py 8174-24 8137-24`) | ## תיקיית `.archive/` — סקריפטים שהושלמו diff --git a/scripts/backfill_chunk_pages.py b/scripts/backfill_chunk_pages.py new file mode 100644 index 0000000..eb1cd85 --- /dev/null +++ b/scripts/backfill_chunk_pages.py @@ -0,0 +1,204 @@ +"""Backfill page_number on existing document_chunks. + +Why this exists: the legacy chunker did not track which page each chunk +came from. After the page-tracking fix, new uploads carry page_number +correctly, but existing chunks have ``page_number=NULL`` in the DB. +That blocks the multimodal hybrid retriever's text+image boost (it +joins (chunk, image) on (document_id, page_number)). + +What it does (per case): + 1. List every document in the case + 2. For each document with NULL page_number chunks: + a. Re-extract via extractor.extract_text (re-runs OCR if needed — + ~$0.0015/page on Google Vision; idempotent on the DB side) + b. Compute page_offsets from the re-extracted text + c. For every chunk row (sorted by chunk_index), search its + content in the re-extracted text → look up page → UPDATE + 3. Skip documents whose chunks already have non-null page_number + +Idempotent: a second run with no --force is a no-op. + +Designed to run from inside the FastAPI/MCP container (where /data +is mounted and Google Vision creds are present). Locally it requires +GOOGLE_CLOUD_VISION_API_KEY in ~/.env. + +Usage: + docker exec -it python /tmp/backfill_chunk_pages.py 8174-24 8137-24 +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +import time +from pathlib import Path +from uuid import UUID + + +def _setup_paths(): + here = Path(__file__).resolve().parent + mcp_src = here.parent / "mcp-server" / "src" + if mcp_src.is_dir() and str(mcp_src) not in sys.path: + sys.path.insert(0, str(mcp_src)) + + +_setup_paths() +from legal_mcp.services import db, extractor # noqa: E402 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("backfill_chunk_pages") + + +def _resolve_local_path(db_path: str) -> Path: + p = Path(db_path) + if p.is_file(): + return p + if str(p).startswith("/data/"): + local = Path("/home/chaim/legal-ai") / Path(*p.parts[1:]) + if local.is_file(): + return local + return p + + +async def _backfill_document( + document_id: UUID, + title: str, + db_file_path: str, + force: bool, +) -> dict: + pool = await db.get_pool() + + # Fetch chunks for this document + chunks = await pool.fetch( + "SELECT id, chunk_index, content, page_number FROM document_chunks " + "WHERE document_id = $1 ORDER BY chunk_index", + document_id, + ) + if not chunks: + return {"status": "no_chunks"} + + n_null = sum(1 for c in chunks if c["page_number"] is None) + if not force and n_null == 0: + logger.info(" skip (all %d chunks already tagged): %s", len(chunks), title) + return {"status": "skipped", "chunks": len(chunks)} + + pdf_path = _resolve_local_path(db_file_path) + if not pdf_path.is_file(): + logger.warning(" file missing: %s (%s)", pdf_path, title) + return {"status": "missing"} + if pdf_path.suffix.lower() != ".pdf": + return {"status": "not_pdf"} + + logger.info(" re-extracting %s (%d chunks, %d need page)", + title, len(chunks), n_null) + t0 = time.time() + text, page_count, page_offsets = await extractor.extract_text(str(pdf_path)) + elapsed = time.time() - t0 + if not page_offsets: + return {"status": "no_offsets"} + + # Walk chunks, find each in the re-extracted text, assign page + pos = 0 + updated = 0 + not_found = 0 + for c in chunks: + content = c["content"] + if not content: + continue + idx = text.find(content, pos) + if idx < 0: + idx = text.find(content) # global fallback + if idx < 0: + not_found += 1 + continue + page = extractor.page_at_offset(idx, page_offsets) + await pool.execute( + "UPDATE document_chunks SET page_number = $1 WHERE id = $2", + page, c["id"], + ) + updated += 1 + # advance roughly past midpoint — chunks have overlap + pos = idx + max(1, len(content) // 2) + + logger.info( + " done in %.1fs: extracted %d pages, updated %d/%d chunks, " + "%d not found", elapsed, page_count, updated, len(chunks), not_found, + ) + return { + "status": "ok", + "elapsed_sec": round(elapsed, 1), + "pages": page_count, + "chunks_total": len(chunks), + "chunks_updated": updated, + "chunks_not_found": not_found, + } + + +async def backfill_cases(case_numbers: list[str], force: bool) -> dict: + pool = await db.get_pool() + summary: dict = {} + for cn in case_numbers: + logger.info("=" * 60) + logger.info("Case %s", cn) + case = await db.get_case_by_number(cn) + if not case: + logger.warning("Case not found: %s", cn) + summary[cn] = {"status": "case_not_found"} + continue + case_id = UUID(str(case["id"])) + docs = await pool.fetch( + "SELECT id, title, file_path FROM documents WHERE case_id = $1 ORDER BY title", + case_id, + ) + logger.info(" %d documents", len(docs)) + per_doc: list[dict] = [] + for d in docs: + r = await _backfill_document( + UUID(str(d["id"])), d["title"], d["file_path"], force, + ) + per_doc.append({"document_id": str(d["id"]), "title": d["title"], **r}) + summary[cn] = { + "documents_total": len(docs), + "ok": sum(1 for r in per_doc if r["status"] == "ok"), + "skipped": sum(1 for r in per_doc if r["status"] == "skipped"), + "missing": sum(1 for r in per_doc if r["status"] == "missing"), + "no_chunks": sum(1 for r in per_doc if r["status"] == "no_chunks"), + "no_offsets": sum(1 for r in per_doc if r["status"] == "no_offsets"), + "chunks_updated": sum(r.get("chunks_updated", 0) for r in per_doc), + "documents": per_doc, + } + return summary + + +def main(): + parser = argparse.ArgumentParser(description="Backfill page_number on existing chunks") + parser.add_argument("cases", nargs="+", help="Case numbers (e.g. 8174-24 8137-24)") + parser.add_argument( + "--force", action="store_true", + help="Re-extract even if all chunks already have page_number (default: skip)", + ) + args = parser.parse_args() + + summary = asyncio.run(backfill_cases(args.cases, force=args.force)) + print() + print("=" * 60) + print("SUMMARY") + print("=" * 60) + for cn, s in summary.items(): + if s.get("status") == "case_not_found": + print(f" {cn}: NOT FOUND") + continue + print( + f" {cn}: {s['documents_total']} docs — " + f"ok {s['ok']}, skipped {s['skipped']}, missing {s['missing']}, " + f"chunks_updated {s['chunks_updated']}" + ) + + +if __name__ == "__main__": + main() diff --git a/web/app.py b/web/app.py index aaeb9fe..64a24ef 100644 --- a/web/app.py +++ b/web/app.py @@ -3500,7 +3500,7 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe # Extract text await _progress.set(task_id, {"status": "processing", "filename": req.filename, "step": "extracting"}) - text, page_count = await extractor.extract_text(str(dest)) + text, page_count, _ = await extractor.extract_text(str(dest)) # Parse date d_date = None