Files
legal-ai/mcp-server/src/legal_mcp/services/ingest.py
Chaim 4f7c3733e2
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
fix(ingest): read staged file via storage.ensure_local — s3-only upload 500 (INV-STG1)
תיקון: העלאת פסיקה/החלטת-ועדה (precedent-library + internal-decisions) נכשלה
תחת backend s3-only עם "Package not found at '/data/...docx'" / "Converted file
not found". השורש: ‎`ingest._stage_file` כותב את הקובץ דרך ‎`storage.put_file`
ומחזיר נתיב‎-DATA_DIR, אבל תחת s3-only ה‎-blob נכתב רק ל‎-MinIO ואין עותק בדיסק —
ואז הצינור קרא את הנתיב ישירות מהדיסק (extract_text) → קובץ לא קיים. מסלול
תיקי‎-המקרה לא נפגע כי הוא שומר עותק‎-דיסק + mirror_file; רק מסלול ‎_stage_file
המשותף קרא את ה‎-key כאילו הוא על הדיסק.

התיקון (נרמול‎-במקור, G1; קריאה דרך שכבת‎-האחסון, INV-STG1):
- ‎`_stage_file` מחזיר עכשיו את ה‎-KEY (נתיב יחסי‎-DATA_DIR), לא Path.
- ‎`ingest_document` ו‎-‎`digest_library` מאתרים נתיב‎-קריאה מקומי דרך
  ‎`storage.ensure_local` (עותק‎-דיסק תחת filesystem/dual; הורדה ל‎-temp תחת
  s3-only) ומנקים את ה‎-temp ב‎-finally — בלי דליפה ל‎-/tmp.
- מולטימודל (PDF) קורא את אותו נתיב מקומי מאומת.

בדיקות: test_unified_ingest::test_ingest_reads_via_ensure_local_when_no_disk_copy
מדמה backend ללא עותק‎-דיסק ומוודא שהצינור משלים (נכשל מול הקוד הישן). 55 עוברות.

Invariants: מקיים INV-STG1 (קריאה/כתיבה רק דרך שכבת‎-האחסון), G1 (נרמול‎-במקור,
לא תיקון‎-בקריאה), G2 (אין מסלול מקביל — תיקון הצינור הקנוני).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 07:32:04 +00:00

326 lines
14 KiB
Python

"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import mimetypes
import re
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, storage
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
async def _stage_file(src_path: Path, root: Path, subdir: str) -> str:
"""Stage an intake file through the unified storage layer (INV-STG1).
Returns the storage KEY (DATA_DIR-relative path) the blob was written under.
The caller resolves a readable local path via ``storage.ensure_local`` — the
key is NOT guaranteed to map to an existing on-disk file (under the s3-only
backend the bytes live only in object storage). The Hebrew original filename
rides as object metadata, never as the key (INV-STG2)."""
dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
key = dest.relative_to(config.DATA_DIR).as_posix()
await storage.put_file(
src_path, key, bucket=storage.Bucket.DOCUMENTS,
content_type=mimetypes.guess_type(src_path.name)[0],
metadata={"filename": src_path.name},
)
return key
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
staged_is_tmp = False
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged_key = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
# Resolve a real local path to read from. Under filesystem/dual this is
# the on-disk copy; under s3-only the blob lives only in object storage,
# so ensure_local downloads it to a temp file we own and must clean up
# (INV-STG1 — the pipeline must read through the storage layer, never
# assume the key maps to an existing DATA_DIR file).
staged_is_tmp = storage.local_path(
staged_key, bucket=storage.Bucket.DOCUMENTS) is None
staged = await storage.ensure_local(
staged_key, bucket=storage.Bucket.DOCUMENTS)
try:
if staged is not None:
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = (raw_text or "")
else:
raw_text = (text or "")
# Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping
# it out — it is a free professional gold-set for benchmarking halacha
# extraction (#86.3). Stored on the case_law row below once we have its id.
nevo_ratio = extractor.extract_nevo_ratio(raw_text)
raw_text = extractor.strip_nevo_preamble(raw_text).strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
# Persist the captured mini-ratio (best-effort; never block ingest on it).
if nevo_ratio:
try:
await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio)
except Exception as e: # noqa: BLE001 — additive metadata, non-fatal
logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e)
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
await db.mark_indexed(case_law_id)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await db.recompute_searchable(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
finally:
# Drop the temp download (s3-only); on filesystem/dual ``staged`` is the
# canonical on-disk copy and must NOT be removed.
if staged_is_tmp and staged is not None:
try:
staged.unlink(missing_ok=True)
except OSError as e: # noqa: BLE001 — temp cleanup, never fatal
logger.debug("could not remove temp staged file %s: %s", staged, e)
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
async def reindex_case_law(
case_law_id: "UUID | str",
progress: ProgressCb | None = None,
) -> dict:
"""Re-chunk + re-embed an existing case_law row from its STORED full_text (GAP-09).
No re-extract / no re-OCR (uses the stored text — see feedback_no_reocr_retrofit)
and no LLM/CLI (only chunker + voyage embeddings), so it is safe to run anywhere.
Idempotent: store_precedent_chunks(_hierarchical) is DELETE-then-INSERT.
"""
progress = progress or _noop_progress
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
row = await db.get_case_law(cid)
if not row:
raise ValueError(f"case_law not found: {cid}")
text = (row.get("full_text") or "").strip()
if not text:
raise ValueError("case_law has no stored full_text to re-index")
stored = await _chunk_embed_store(cid, text, None, 0, progress)
await db.mark_indexed(cid)
await progress("completed", 100, f"הוטמע מחדש: {stored} chunks")
return {"status": "completed", "case_law_id": str(cid), "chunks": stored, "reindexed": True}