From 4f7c3733e272ef11cd4b07b9c76b563c48c4d925 Mon Sep 17 00:00:00 2001 From: Chaim Date: Fri, 12 Jun 2026 07:32:04 +0000 Subject: [PATCH] =?UTF-8?q?fix(ingest):=20read=20staged=20file=20via=20sto?= =?UTF-8?q?rage.ensure=5Flocal=20=E2=80=94=20s3-only=20upload=20500=20(INV?= =?UTF-8?q?-STG1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit תיקון: העלאת פסיקה/החלטת-ועדה (precedent-library + internal-decisions) נכשלה תחת backend s3-only עם "Package not found at '/data/...docx'" / "Converted file not found". השורש: ‎`ingest._stage_file` כותב את הקובץ דרך ‎`storage.put_file` ומחזיר נתיב‎-DATA_DIR, אבל תחת s3-only ה‎-blob נכתב רק ל‎-MinIO ואין עותק בדיסק — ואז הצינור קרא את הנתיב ישירות מהדיסק (extract_text) → קובץ לא קיים. מסלול תיקי‎-המקרה לא נפגע כי הוא שומר עותק‎-דיסק + mirror_file; רק מסלול ‎_stage_file המשותף קרא את ה‎-key כאילו הוא על הדיסק. התיקון (נרמול‎-במקור, G1; קריאה דרך שכבת‎-האחסון, INV-STG1): - ‎`_stage_file` מחזיר עכשיו את ה‎-KEY (נתיב יחסי‎-DATA_DIR), לא Path. - ‎`ingest_document` ו‎-‎`digest_library` מאתרים נתיב‎-קריאה מקומי דרך ‎`storage.ensure_local` (עותק‎-דיסק תחת filesystem/dual; הורדה ל‎-temp תחת s3-only) ומנקים את ה‎-temp ב‎-finally — בלי דליפה ל‎-/tmp. - מולטימודל (PDF) קורא את אותו נתיב מקומי מאומת. בדיקות: test_unified_ingest::test_ingest_reads_via_ensure_local_when_no_disk_copy מדמה backend ללא עותק‎-דיסק ומוודא שהצינור משלים (נכשל מול הקוד הישן). 55 עוברות. Invariants: מקיים INV-STG1 (קריאה/כתיבה רק דרך שכבת‎-האחסון), G1 (נרמול‎-במקור, לא תיקון‎-בקריאה), G2 (אין מסלול מקביל — תיקון הצינור הקנוני). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/legal_mcp/services/digest_library.py | 21 ++- mcp-server/src/legal_mcp/services/ingest.py | 177 ++++++++++-------- mcp-server/tests/test_storage_staging.py | 16 +- mcp-server/tests/test_unified_ingest.py | 55 ++++++ 4 files changed, 180 insertions(+), 89 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/digest_library.py b/mcp-server/src/legal_mcp/services/digest_library.py index af9bfa5..f69a8ac 100644 --- a/mcp-server/src/legal_mcp/services/digest_library.py +++ b/mcp-server/src/legal_mcp/services/digest_library.py @@ -30,7 +30,7 @@ from typing import Awaitable, Callable from uuid import UUID from legal_mcp import config -from legal_mcp.services import db, embeddings, extractor, ingest +from legal_mcp.services import db, embeddings, extractor, ingest, storage logger = logging.getLogger(__name__) @@ -144,12 +144,23 @@ async def create_pending_digest( raise ValueError(f"file not found: {file_path}") await progress("staging", 10, "מעתיק קובץ") - staged = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") - rel_path = str(staged.relative_to(config.DATA_DIR)) \ - if str(staged).startswith(str(config.DATA_DIR)) else str(staged) + # ``_stage_file`` returns the storage KEY (DATA_DIR-relative path). Resolve a + # real local path to read from — on s3-only this downloads to a temp file we + # own and remove after extraction (INV-STG1; the key is not guaranteed to be + # on local disk). + rel_path = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") + local = await storage.ensure_local(rel_path, bucket=storage.Bucket.DOCUMENTS) + local_is_tmp = storage.local_path(rel_path, bucket=storage.Bucket.DOCUMENTS) is None await progress("extracting_text", 50, "מחלץ טקסט") - raw_text, _pc, _off = await extractor.extract_text(str(staged)) + try: + raw_text, _pc, _off = await extractor.extract_text(str(local)) + finally: + if local_is_tmp: + try: + local.unlink(missing_ok=True) + except OSError as e: # noqa: BLE001 — temp cleanup, never fatal + logger.debug("could not remove temp digest file %s: %s", local, e) raw_text = (raw_text or "").strip() if not raw_text: raise ValueError("no text extracted from digest") diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py index fabd9b6..e34a34d 100644 --- a/mcp-server/src/legal_mcp/services/ingest.py +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -66,13 +66,14 @@ def _safe_filename(name: str) -> str: return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" -async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: +async def _stage_file(src_path: Path, root: Path, subdir: str) -> str: """Stage an intake file through the unified storage layer (INV-STG1). - Returns the DATA_DIR path the rest of the pipeline reads from — under the - filesystem/dual backends the bytes are on disk and the key is the - DATA_DIR-relative path. The Hebrew original filename rides as object - metadata, never as the key (INV-STG2).""" + Returns the storage KEY (DATA_DIR-relative path) the blob was written under. + The caller resolves a readable local path via ``storage.ensure_local`` — the + key is NOT guaranteed to map to an existing on-disk file (under the s3-only + backend the bytes live only in object storage). The Hebrew original filename + rides as object metadata, never as the key (INV-STG2).""" dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" key = dest.relative_to(config.DATA_DIR).as_posix() await storage.put_file( @@ -80,7 +81,7 @@ async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: content_type=mimetypes.guess_type(src_path.name)[0], metadata={"filename": src_path.name}, ) - return dest + return key def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: @@ -155,88 +156,108 @@ async def ingest_document( page_count = 0 page_offsets = None staged: Path | None = None + staged_is_tmp = False if file_path: src = Path(file_path) if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") await progress("staging", 5, "מעתיק את הקובץ לאחסון") - staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) - await progress("extracting", 15, "מחלץ טקסט מהקובץ") - try: - raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) - except Exception as e: - await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") - raise - raw_text = (raw_text or "") - else: - raw_text = (text or "") - # Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping - # it out — it is a free professional gold-set for benchmarking halacha - # extraction (#86.3). Stored on the case_law row below once we have its id. - nevo_ratio = extractor.extract_nevo_ratio(raw_text) - raw_text = extractor.strip_nevo_preamble(raw_text).strip() - if not raw_text: - await progress("failed", 100, "לא נמצא טקסט בקובץ") - raise ValueError("no extractable text in file") - - # Step 6: DB create (type-specific, routed — get case_law_id). - await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") - display_name = (inputs.get("case_name") or "").strip() or ( - inputs.get(spec.display_name_fallback) or "" - ).strip() - record = await spec.create_record( - full_text=raw_text, - case_name=display_name, - decision_date=_coerce_date(inputs.get("decision_date")), - document_id=document_id, - **{k: v for k, v in inputs.items() - if k not in {"case_name", "decision_date", "file_path", "text"}}, - ) - case_law_id = UUID(str(record["id"])) - - # Persist the captured mini-ratio (best-effort; never block ingest on it). - if nevo_ratio: - try: - await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio) - except Exception as e: # noqa: BLE001 — additive metadata, non-fatal - logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e) - + staged_key = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + # Resolve a real local path to read from. Under filesystem/dual this is + # the on-disk copy; under s3-only the blob lives only in object storage, + # so ensure_local downloads it to a temp file we own and must clean up + # (INV-STG1 — the pipeline must read through the storage layer, never + # assume the key maps to an existing DATA_DIR file). + staged_is_tmp = storage.local_path( + staged_key, bucket=storage.Bucket.DOCUMENTS) is None + staged = await storage.ensure_local( + staged_key, bucket=storage.Bucket.DOCUMENTS) try: - stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) - await db.mark_indexed(case_law_id) - - # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. - if (config.MULTIMODAL_ENABLED and page_count > 0 - and staged is not None and staged.suffix.lower() == ".pdf"): + if staged is not None: + await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: - await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") - await _embed_pages(case_law_id, staged, page_count) + raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) except Exception as e: - logger.warning("Multimodal embedding failed (non-fatal): %s", e) + await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") + raise + raw_text = (raw_text or "") + else: + raw_text = (text or "") + # Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping + # it out — it is a free professional gold-set for benchmarking halacha + # extraction (#86.3). Stored on the case_law row below once we have its id. + nevo_ratio = extractor.extract_nevo_ratio(raw_text) + raw_text = extractor.strip_nevo_preamble(raw_text).strip() + if not raw_text: + await progress("failed", 100, "לא נמצא טקסט בקובץ") + raise ValueError("no extractable text in file") - # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "pending") - await db.request_metadata_extraction(case_law_id) - await db.request_halacha_extraction(case_law_id) - await db.recompute_searchable(case_law_id) + # Step 6: DB create (type-specific, routed — get case_law_id). + await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") + display_name = (inputs.get("case_name") or "").strip() or ( + inputs.get(spec.display_name_fallback) or "" + ).strip() + record = await spec.create_record( + full_text=raw_text, + case_name=display_name, + decision_date=_coerce_date(inputs.get("decision_date")), + document_id=document_id, + **{k: v for k, v in inputs.items() + if k not in {"case_name", "decision_date", "file_path", "text"}}, + ) + case_law_id = UUID(str(record["id"])) - await progress("completed", 100, - f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": stored_chunks, - "halachot": 0, - "halachot_pending": True, - "metadata_filled": [], - "pages": page_count, - } - except Exception as e: - logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) - await db.set_case_law_extraction_status(case_law_id, "failed") - await progress("failed", 100, f"כשל בעיבוד: {e}") - raise + # Persist the captured mini-ratio (best-effort; never block ingest on it). + if nevo_ratio: + try: + await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio) + except Exception as e: # noqa: BLE001 — additive metadata, non-fatal + logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e) + + try: + stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) + await db.mark_indexed(case_law_id) + + # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. + if (config.MULTIMODAL_ENABLED and page_count > 0 + and staged is not None and staged.suffix.lower() == ".pdf"): + try: + await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") + await _embed_pages(case_law_id, staged, page_count) + except Exception as e: + logger.warning("Multimodal embedding failed (non-fatal): %s", e) + + # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) + await db.recompute_searchable(case_law_id) + + await progress("completed", 100, + f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": stored_chunks, + "halachot": 0, + "halachot_pending": True, + "metadata_filled": [], + "pages": page_count, + } + except Exception as e: + logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) + await db.set_case_law_extraction_status(case_law_id, "failed") + await progress("failed", 100, f"כשל בעיבוד: {e}") + raise + finally: + # Drop the temp download (s3-only); on filesystem/dual ``staged`` is the + # canonical on-disk copy and must NOT be removed. + if staged_is_tmp and staged is not None: + try: + staged.unlink(missing_ok=True) + except OSError as e: # noqa: BLE001 — temp cleanup, never fatal + logger.debug("could not remove temp staged file %s: %s", staged, e) async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: diff --git a/mcp-server/tests/test_storage_staging.py b/mcp-server/tests/test_storage_staging.py index 9cf6cea..7768d1a 100644 --- a/mcp-server/tests/test_storage_staging.py +++ b/mcp-server/tests/test_storage_staging.py @@ -30,20 +30,24 @@ def test_stage_file_lands_under_datadir(_tmp_datadir): src.parent.mkdir(parents=True) src.write_bytes(b"%PDF-1.4 ...") root = _tmp_datadir / "precedent-library" - dest = run(ingest._stage_file(src, root, "court_ruling")) - # dest is under the staging subdir, prefixed with a uuid, original suffix kept + # _stage_file returns the DATA_DIR-relative storage KEY (what the DB stores); + # the caller resolves a local path via storage.ensure_local. + key = run(ingest._stage_file(src, root, "court_ruling")) + assert isinstance(key, str) + assert key.startswith("precedent-library/court_ruling/") + assert key.endswith(".pdf") + # under the filesystem backend the key maps to the exact legacy on-disk path + dest = _tmp_datadir / key assert dest.parent == root / "court_ruling" assert dest.exists() assert dest.read_bytes() == b"%PDF-1.4 ..." - assert dest.suffix == ".pdf" - # and the key is DATA_DIR-relative (what the DB column will store) - assert dest.relative_to(_tmp_datadir).as_posix().startswith("precedent-library/court_ruling/") def test_stage_file_default_subdir(_tmp_datadir): src = _tmp_datadir / "x.docx" src.write_bytes(b"doc") - dest = run(ingest._stage_file(src, _tmp_datadir / "digests", "")) + key = run(ingest._stage_file(src, _tmp_datadir / "digests", "")) + dest = _tmp_datadir / key assert dest.parent == _tmp_datadir / "digests" / "other" assert dest.exists() diff --git a/mcp-server/tests/test_unified_ingest.py b/mcp-server/tests/test_unified_ingest.py index 0fd0ea4..8ad703d 100644 --- a/mcp-server/tests/test_unified_ingest.py +++ b/mcp-server/tests/test_unified_ingest.py @@ -172,3 +172,58 @@ def test_display_name_fallback_uses_canonical_id(patched, tmp_path): case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) kind, kw = patched["create"][0] assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id" + + +def test_ingest_reads_via_ensure_local_when_no_disk_copy(patched, tmp_path, monkeypatch): + """Regression: under the s3-only backend the staged key has NO on-disk file, + so reading the DATA_DIR path directly 500'd ('Package not found at …'). + ingest must resolve a readable local path via storage.ensure_local (a temp + download) and clean it up afterwards. + """ + from legal_mcp.services import storage + + store: dict[str, bytes] = {} + + class _MemBackend(storage.StorageBackend): + """In-memory backend with NO local copy — mimics s3-only.""" + name = "mem" + + async def put_bytes(self, key, data, *, bucket=storage.Bucket.DOCUMENTS, + content_type=None, metadata=None): + store[storage.normalize_key(key)] = bytes(data) + return f"mem://{key}" + + async def put_file(self, src, key, *, bucket=storage.Bucket.DOCUMENTS, + content_type=None, metadata=None): + store[storage.normalize_key(key)] = Path(src).read_bytes() + return f"mem://{key}" + + async def get_bytes(self, key, *, bucket=storage.Bucket.DOCUMENTS): + return store[storage.normalize_key(key)] + + async def exists(self, key, *, bucket=storage.Bucket.DOCUMENTS): + return storage.normalize_key(key) in store + + def local_path(self, key, *, bucket=storage.Bucket.DOCUMENTS): + return None # the s3-only condition: nothing on disk + + monkeypatch.setattr(storage, "_singleton", _MemBackend()) + + # An extract_text that actually READS the path it is handed — proves ingest + # passes a real, readable local file rather than a phantom DATA_DIR path. + seen = [] + + async def _extract_reads(path): + seen.append(path) + assert Path(path).read_bytes() == b"%PDF-1.4 fake" + return ("full decision text", 1, [0]) + + monkeypatch.setattr(extractor, "extract_text", _extract_reads) + + out = _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20", + practice_area="rishuy_uvniya", source_type="court_ruling", + )) + assert out["status"] == "completed" + # the temp download was cleaned up (no /tmp leak) + assert seen and not Path(seen[0]).exists()