Files
legal-ai/mcp-server/tests/test_unified_ingest.py
Chaim 4f7c3733e2
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
fix(ingest): read staged file via storage.ensure_local — s3-only upload 500 (INV-STG1)
תיקון: העלאת פסיקה/החלטת-ועדה (precedent-library + internal-decisions) נכשלה
תחת backend s3-only עם "Package not found at '/data/...docx'" / "Converted file
not found". השורש: ‎`ingest._stage_file` כותב את הקובץ דרך ‎`storage.put_file`
ומחזיר נתיב‎-DATA_DIR, אבל תחת s3-only ה‎-blob נכתב רק ל‎-MinIO ואין עותק בדיסק —
ואז הצינור קרא את הנתיב ישירות מהדיסק (extract_text) → קובץ לא קיים. מסלול
תיקי‎-המקרה לא נפגע כי הוא שומר עותק‎-דיסק + mirror_file; רק מסלול ‎_stage_file
המשותף קרא את ה‎-key כאילו הוא על הדיסק.

התיקון (נרמול‎-במקור, G1; קריאה דרך שכבת‎-האחסון, INV-STG1):
- ‎`_stage_file` מחזיר עכשיו את ה‎-KEY (נתיב יחסי‎-DATA_DIR), לא Path.
- ‎`ingest_document` ו‎-‎`digest_library` מאתרים נתיב‎-קריאה מקומי דרך
  ‎`storage.ensure_local` (עותק‎-דיסק תחת filesystem/dual; הורדה ל‎-temp תחת
  s3-only) ומנקים את ה‎-temp ב‎-finally — בלי דליפה ל‎-/tmp.
- מולטימודל (PDF) קורא את אותו נתיב מקומי מאומת.

בדיקות: test_unified_ingest::test_ingest_reads_via_ensure_local_when_no_disk_copy
מדמה backend ללא עותק‎-דיסק ומוודא שהצינור משלים (נכשל מול הקוד הישן). 55 עוברות.

Invariants: מקיים INV-STG1 (קריאה/כתיבה רק דרך שכבת‎-האחסון), G1 (נרמול‎-במקור,
לא תיקון‎-בקריאה), G2 (אין מסלול מקביל — תיקון הצינור הקנוני).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 07:32:04 +00:00

230 lines
8.7 KiB
Python

"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
Proves both intake types flow through services.ingest.ingest_document and that
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
queued for BOTH types (GAP-02 regression), enum validation applies to both
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
external citation guard is preserved.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from uuid import uuid4
import pytest
from legal_mcp import config
from legal_mcp.services import db, embeddings, chunker, extractor
from legal_mcp.services import ingest, precedent_library, internal_decisions
def _run(coro):
return asyncio.run(coro)
class _Chunk:
def __init__(self, i):
self.chunk_index = i
self.content = f"chunk-{i}"
self.section_type = "body"
self.page_number = 1
self.role = "child"
self.local_id = f"c{i}"
self.parent_local_id = None
@pytest.fixture()
def patched(monkeypatch, tmp_path):
"""Patch every I/O boundary. Record queue + create calls."""
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
async def _extract_text(path):
return ("full decision text", 2, [0, 100])
def _strip(text):
return text
def _chunk(text, page_offsets=None):
return [_Chunk(0), _Chunk(1)]
async def _embed(texts, input_type="document"):
return [[0.0] * 8 for _ in texts]
async def _store_chunks(cid, dicts):
calls["chunks"].append((cid, len(dicts)))
return len(dicts)
async def _create_external(**kw):
calls["create"].append(("external", kw))
return {"id": uuid4()}
async def _create_internal(**kw):
calls["create"].append(("internal", kw))
return {"id": uuid4()}
async def _req_meta(cid):
calls["metadata"].append(cid)
async def _req_hal(cid):
calls["halacha"].append(cid)
async def _set_status(cid, status):
return None
async def _recompute_searchable(cid=None):
return 0
monkeypatch.setattr(extractor, "extract_text", _extract_text)
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
monkeypatch.setattr(chunker, "chunk_document", _chunk)
monkeypatch.setattr(embeddings, "embed_texts", _embed)
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
monkeypatch.setattr(db, "create_external_case_law", _create_external)
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
monkeypatch.setattr(db, "recompute_searchable", _recompute_searchable)
monkeypatch.setattr(db, "mark_indexed", _recompute_searchable)
# Force flat chunking + multimodal OFF unless a test flips it.
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
return calls
def _make_pdf(tmp_path) -> str:
p = tmp_path / "decision.pdf"
p.write_bytes(b"%PDF-1.4 fake")
return str(p)
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
"""GAP-02 regression: the internal path must queue metadata too."""
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
district="ירושלים", practice_area="betterment_levy",
))
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
assert len(patched["halacha"]) == 1
def test_external_queues_both(patched, tmp_path):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert len(patched["metadata"]) == 1
assert len(patched["halacha"]) == 1
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
seen = []
real = ingest.ingest_document
async def _spy(spec, **kw):
seen.append(spec.source_kind)
return await real(spec, **kw)
monkeypatch.setattr(ingest, "ingest_document", _spy)
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
assert seen == ["internal_committee", "external_upload"]
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
"""GAP-04: internal path must validate enums like the external one."""
with pytest.raises(ValueError, match="practice_area"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
with pytest.raises(ValueError, match="practice_area"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
with pytest.raises(ValueError, match="ערר"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
def test_internal_text_path_works_without_file(patched):
out = _run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
assert out["status"] == "completed"
assert out["case_law_id"]
def test_internal_requires_file_or_text(patched):
with pytest.raises(ValueError, match="file_path or text"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
kind, kw = patched["create"][0]
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
def test_ingest_reads_via_ensure_local_when_no_disk_copy(patched, tmp_path, monkeypatch):
"""Regression: under the s3-only backend the staged key has NO on-disk file,
so reading the DATA_DIR path directly 500'd ('Package not found at …').
ingest must resolve a readable local path via storage.ensure_local (a temp
download) and clean it up afterwards.
"""
from legal_mcp.services import storage
store: dict[str, bytes] = {}
class _MemBackend(storage.StorageBackend):
"""In-memory backend with NO local copy — mimics s3-only."""
name = "mem"
async def put_bytes(self, key, data, *, bucket=storage.Bucket.DOCUMENTS,
content_type=None, metadata=None):
store[storage.normalize_key(key)] = bytes(data)
return f"mem://{key}"
async def put_file(self, src, key, *, bucket=storage.Bucket.DOCUMENTS,
content_type=None, metadata=None):
store[storage.normalize_key(key)] = Path(src).read_bytes()
return f"mem://{key}"
async def get_bytes(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return store[storage.normalize_key(key)]
async def exists(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return storage.normalize_key(key) in store
def local_path(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return None # the s3-only condition: nothing on disk
monkeypatch.setattr(storage, "_singleton", _MemBackend())
# An extract_text that actually READS the path it is handed — proves ingest
# passes a real, readable local file rather than a phantom DATA_DIR path.
seen = []
async def _extract_reads(path):
seen.append(path)
assert Path(path).read_bytes() == b"%PDF-1.4 fake"
return ("full decision text", 1, [0])
monkeypatch.setattr(extractor, "extract_text", _extract_reads)
out = _run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert out["status"] == "completed"
# the temp download was cleaned up (no /tmp leak)
assert seen and not Path(seen[0]).exists()