#86.2 backfill + #86.3 benchmark, plus a #86.1 over-strip fix found en route.
extractor.py
- extract_nevo_ratio(): capture Nevo's מיני-רציו block (editorial holdings
summary) before it is stripped — a free professional gold-set (#86.3).
- _DECISION_START hardening (#86.2): the merged #86.1 regex over-stripped.
(a) פסק-דין headers are markdown-wrapped (**פסק דין**); the old anchor
required the keyword as the first line char with one separator, so it
missed the header and matched a citation 32K deep (עמ"נ 50567-07-21,
losing 45% of the body). Now tolerates leading markdown + 0-3 seps,
and the final-nun form (דין ן vs דינו נ).
(b) bare השופט/הנשיא matched CITATIONS ("השופט מ' חשין, פסקה 23"). The
authoring-judge line ends with a colon; we now require it.
ingest.py
- capture the ratio before stripping and store it on the row (best-effort,
non-fatal); also strip the text-upload path (was file-only).
db.py
- add case_law.nevo_ratio column (additive); allow it in update_case_law.
scripts/backfill_nevo_preamble.py (#86.2) — dry-run-by-default data migration:
finds historically-leaked rulings, captures ratio→nevo_ratio, rewrites
full_text (+content_hash), reindexes, and FLAGS (never deletes) halachot whose
quote lives in the removed preamble (review_status=pending_review +
nevo_preamble_leak flag). Safety guard: rows with keep%<--min-keep (60) are
excluded from --apply as suspected over-strip. --apply writes backup+manifest
to data/audit/ first. Chair-gated — NOT applied here.
scripts/nevo_ratio_benchmark.py (#86.3) — LLM-as-judge (local claude_session,
zero cost) measures recall/precision/granularity of our halachot vs the Nevo
ratio. Works pre- and post-backfill (reads nevo_ratio, falls back to full_text).
Verified:
- pytest tests/test_nevo_preamble.py — 12 passed (incl. citation/markdown
over-strip regressions).
- backfill dry-run: 19 leaked rulings, 27 contaminated halachot, all ≥75%
keep (the 32K over-strip is gone).
- benchmark on בג"ץ 1764/05: recall=0.875 precision=1.0 granularity=1.75x.
Invariants: G1 (normalize at source — strip/capture at ingest, not at read);
no silent swallow (contaminated halachot flagged + reported, not dropped);
data-migration is dry-run-default with backup+manifest, chair-gated.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
296 lines
12 KiB
Python
296 lines
12 KiB
Python
"""Canonical ingest pipeline (FU-1).
|
|
|
|
One pipeline for all sibling-entity intake types (external precedent,
|
|
internal committee decision). Per-type variation rides on an ``IntakeSpec``
|
|
config object — never a parallel function. See
|
|
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
|
|
|
|
claude_session rule preserved: this module only QUEUES extraction
|
|
(``request_*_extraction`` = pure DB writes). It never imports
|
|
halacha_extractor / precedent_metadata_extractor, so it is safe to call
|
|
from the FastAPI container where the ``claude`` CLI is unavailable.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Awaitable, Callable
|
|
from uuid import UUID, uuid4
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import chunker, db, embeddings, extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ProgressCb = Callable[[str, int, str], Awaitable[None]]
|
|
|
|
|
|
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
|
return None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class IntakeSpec:
|
|
"""Describes everything that varies between intake types."""
|
|
source_kind: str
|
|
id_field: str
|
|
staging_root: Path
|
|
staging_subdir: Callable[[dict], str]
|
|
validate: Callable[[dict], None]
|
|
enum_fields: dict[str, frozenset[str]]
|
|
derive: Callable[[dict], dict]
|
|
display_name_fallback: str
|
|
create_record: Callable[..., Awaitable[dict]]
|
|
|
|
|
|
def _coerce_date(value) -> date | None:
|
|
if value is None or value == "":
|
|
return None
|
|
if isinstance(value, date):
|
|
return value
|
|
if isinstance(value, str):
|
|
try:
|
|
return date.fromisoformat(value[:10])
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _safe_filename(name: str) -> str:
|
|
base = Path(name).name
|
|
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
|
|
|
|
|
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
|
dest_dir = root / (subdir or "other")
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
|
shutil.copy2(src_path, dest)
|
|
return dest
|
|
|
|
|
|
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
|
|
for field_name, allowed in spec.enum_fields.items():
|
|
value = inputs.get(field_name, "") or ""
|
|
if value not in allowed:
|
|
raise ValueError(f"invalid {field_name}: {value!r}")
|
|
|
|
|
|
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
|
|
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
|
|
thumb_dir = spec_thumb_dir(case_law_id)
|
|
rendered = await asyncio.to_thread(
|
|
extractor.render_pages_for_multimodal,
|
|
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
|
|
)
|
|
images = [pil for pil, _ in rendered]
|
|
thumbs = [t for _, t in rendered]
|
|
img_embs = await embeddings.embed_images(images)
|
|
|
|
page_records = []
|
|
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
|
|
rel_thumb = None
|
|
if thumb is not None:
|
|
try:
|
|
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
|
except ValueError:
|
|
rel_thumb = str(thumb)
|
|
page_records.append({
|
|
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
|
|
})
|
|
stored = await db.store_precedent_image_embeddings(
|
|
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
|
|
)
|
|
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
|
|
return {"pages_embedded": stored}
|
|
|
|
|
|
def spec_thumb_dir(case_law_id: UUID) -> Path:
|
|
"""Thumbnails live under the precedent-library tree regardless of intake type."""
|
|
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
|
|
|
|
|
|
async def ingest_document(
|
|
spec: IntakeSpec,
|
|
*,
|
|
inputs: dict,
|
|
file_path: str | Path | None = None,
|
|
text: str | None = None,
|
|
document_id: UUID | None = None,
|
|
progress: ProgressCb | None = None,
|
|
) -> dict:
|
|
"""Run the canonical 12-step pipeline for one intake item.
|
|
|
|
``inputs`` carries the type-specific record fields (citation/case_number,
|
|
case_name, court, practice_area, etc.). ``spec`` decides how they are
|
|
validated, staged, derived, and which DB-create runs. Returns a dict with
|
|
at least: status, case_law_id, chunks.
|
|
"""
|
|
progress = progress or _noop_progress
|
|
|
|
# Step 1: input validation (type-specific) + enums (uniform mechanism).
|
|
if not file_path and text is None:
|
|
raise ValueError("either file_path or text is required")
|
|
spec.validate(inputs)
|
|
_validate_enums(spec, inputs)
|
|
|
|
# Step 2: field derivation (identity for external).
|
|
inputs = {**inputs, **spec.derive(inputs)}
|
|
|
|
# Steps 3-5: stage (if file) + extract + strip.
|
|
page_count = 0
|
|
page_offsets = None
|
|
staged: Path | None = None
|
|
if file_path:
|
|
src = Path(file_path)
|
|
if not src.is_file():
|
|
raise FileNotFoundError(f"file not found: {src}")
|
|
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
|
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
|
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
|
try:
|
|
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
|
except Exception as e:
|
|
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
|
|
raise
|
|
raw_text = (raw_text or "")
|
|
else:
|
|
raw_text = (text or "")
|
|
# Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping
|
|
# it out — it is a free professional gold-set for benchmarking halacha
|
|
# extraction (#86.3). Stored on the case_law row below once we have its id.
|
|
nevo_ratio = extractor.extract_nevo_ratio(raw_text)
|
|
raw_text = extractor.strip_nevo_preamble(raw_text).strip()
|
|
if not raw_text:
|
|
await progress("failed", 100, "לא נמצא טקסט בקובץ")
|
|
raise ValueError("no extractable text in file")
|
|
|
|
# Step 6: DB create (type-specific, routed — get case_law_id).
|
|
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
|
|
display_name = (inputs.get("case_name") or "").strip() or (
|
|
inputs.get(spec.display_name_fallback) or ""
|
|
).strip()
|
|
record = await spec.create_record(
|
|
full_text=raw_text,
|
|
case_name=display_name,
|
|
decision_date=_coerce_date(inputs.get("decision_date")),
|
|
document_id=document_id,
|
|
**{k: v for k, v in inputs.items()
|
|
if k not in {"case_name", "decision_date", "file_path", "text"}},
|
|
)
|
|
case_law_id = UUID(str(record["id"]))
|
|
|
|
# Persist the captured mini-ratio (best-effort; never block ingest on it).
|
|
if nevo_ratio:
|
|
try:
|
|
await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio)
|
|
except Exception as e: # noqa: BLE001 — additive metadata, non-fatal
|
|
logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e)
|
|
|
|
try:
|
|
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
|
|
await db.mark_indexed(case_law_id)
|
|
|
|
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
|
|
if (config.MULTIMODAL_ENABLED and page_count > 0
|
|
and staged is not None and staged.suffix.lower() == ".pdf"):
|
|
try:
|
|
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
|
|
await _embed_pages(case_law_id, staged, page_count)
|
|
except Exception as e:
|
|
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
|
|
|
|
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
|
|
await db.set_case_law_extraction_status(case_law_id, "completed")
|
|
await db.set_case_law_halacha_status(case_law_id, "pending")
|
|
await db.request_metadata_extraction(case_law_id)
|
|
await db.request_halacha_extraction(case_law_id)
|
|
await db.recompute_searchable(case_law_id)
|
|
|
|
await progress("completed", 100,
|
|
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
|
|
return {
|
|
"status": "completed",
|
|
"case_law_id": str(case_law_id),
|
|
"chunks": stored_chunks,
|
|
"halachot": 0,
|
|
"halachot_pending": True,
|
|
"metadata_filled": [],
|
|
"pages": page_count,
|
|
}
|
|
except Exception as e:
|
|
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
|
|
await db.set_case_law_extraction_status(case_law_id, "failed")
|
|
await progress("failed", 100, f"כשל בעיבוד: {e}")
|
|
raise
|
|
|
|
|
|
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
|
|
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
|
|
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
|
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
|
|
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
|
|
if not h_chunks:
|
|
return 0
|
|
children = [c for c in h_chunks if c.role == "child"]
|
|
parents = [c for c in h_chunks if c.role == "parent"]
|
|
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
|
|
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
|
|
chunk_dicts: list[dict] = []
|
|
for p in parents:
|
|
chunk_dicts.append({
|
|
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
|
|
"chunk_index": p.chunk_index, "content": p.content,
|
|
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
|
|
})
|
|
for c, v in zip(children, child_vectors):
|
|
chunk_dicts.append({
|
|
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
|
|
"chunk_index": c.chunk_index, "content": c.content,
|
|
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
|
|
})
|
|
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
|
|
return counts["children"]
|
|
else:
|
|
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
|
|
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
|
if not chunks:
|
|
return 0
|
|
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
|
|
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
|
|
chunk_dicts = [
|
|
{"chunk_index": c.chunk_index, "content": c.content,
|
|
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
|
|
for c, v in zip(chunks, chunk_vectors)
|
|
]
|
|
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
|
|
|
|
|
async def reindex_case_law(
|
|
case_law_id: "UUID | str",
|
|
progress: ProgressCb | None = None,
|
|
) -> dict:
|
|
"""Re-chunk + re-embed an existing case_law row from its STORED full_text (GAP-09).
|
|
|
|
No re-extract / no re-OCR (uses the stored text — see feedback_no_reocr_retrofit)
|
|
and no LLM/CLI (only chunker + voyage embeddings), so it is safe to run anywhere.
|
|
Idempotent: store_precedent_chunks(_hierarchical) is DELETE-then-INSERT.
|
|
"""
|
|
progress = progress or _noop_progress
|
|
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
|
|
row = await db.get_case_law(cid)
|
|
if not row:
|
|
raise ValueError(f"case_law not found: {cid}")
|
|
text = (row.get("full_text") or "").strip()
|
|
if not text:
|
|
raise ValueError("case_law has no stored full_text to re-index")
|
|
stored = await _chunk_embed_store(cid, text, None, 0, progress)
|
|
await db.mark_indexed(cid)
|
|
await progress("completed", 100, f"הוטמע מחדש: {stored} chunks")
|
|
return {"status": "completed", "case_law_id": str(cid), "chunks": stored, "reindexed": True}
|