fix(ingest): קריאת קובץ‎-staging דרך storage.ensure_local — תיקון 500 בהעלאה תחת s3-only (INV-STG1) #228

Merged
chaim merged 1 commits from worktree-s3-ingest-readpath into main 2026-06-12 07:33:15 +00:00
4 changed files with 180 additions and 89 deletions
Showing only changes of commit 4f7c3733e2 - Show all commits

View File

@@ -30,7 +30,7 @@ from typing import Awaitable, Callable
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, embeddings, extractor, ingest
from legal_mcp.services import db, embeddings, extractor, ingest, storage
logger = logging.getLogger(__name__)
@@ -144,12 +144,23 @@ async def create_pending_digest(
raise ValueError(f"file not found: {file_path}")
await progress("staging", 10, "מעתיק קובץ")
staged = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
rel_path = str(staged.relative_to(config.DATA_DIR)) \
if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
# ``_stage_file`` returns the storage KEY (DATA_DIR-relative path). Resolve a
# real local path to read from — on s3-only this downloads to a temp file we
# own and remove after extraction (INV-STG1; the key is not guaranteed to be
# on local disk).
rel_path = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
local = await storage.ensure_local(rel_path, bucket=storage.Bucket.DOCUMENTS)
local_is_tmp = storage.local_path(rel_path, bucket=storage.Bucket.DOCUMENTS) is None
await progress("extracting_text", 50, "מחלץ טקסט")
raw_text, _pc, _off = await extractor.extract_text(str(staged))
try:
raw_text, _pc, _off = await extractor.extract_text(str(local))
finally:
if local_is_tmp:
try:
local.unlink(missing_ok=True)
except OSError as e: # noqa: BLE001 — temp cleanup, never fatal
logger.debug("could not remove temp digest file %s: %s", local, e)
raw_text = (raw_text or "").strip()
if not raw_text:
raise ValueError("no text extracted from digest")

View File

@@ -66,13 +66,14 @@ def _safe_filename(name: str) -> str:
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
async def _stage_file(src_path: Path, root: Path, subdir: str) -> str:
"""Stage an intake file through the unified storage layer (INV-STG1).
Returns the DATA_DIR path the rest of the pipeline reads from — under the
filesystem/dual backends the bytes are on disk and the key is the
DATA_DIR-relative path. The Hebrew original filename rides as object
metadata, never as the key (INV-STG2)."""
Returns the storage KEY (DATA_DIR-relative path) the blob was written under.
The caller resolves a readable local path via ``storage.ensure_local`` — the
key is NOT guaranteed to map to an existing on-disk file (under the s3-only
backend the bytes live only in object storage). The Hebrew original filename
rides as object metadata, never as the key (INV-STG2)."""
dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
key = dest.relative_to(config.DATA_DIR).as_posix()
await storage.put_file(
@@ -80,7 +81,7 @@ async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
content_type=mimetypes.guess_type(src_path.name)[0],
metadata={"filename": src_path.name},
)
return dest
return key
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
@@ -155,12 +156,24 @@ async def ingest_document(
page_count = 0
page_offsets = None
staged: Path | None = None
staged_is_tmp = False
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
staged_key = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
# Resolve a real local path to read from. Under filesystem/dual this is
# the on-disk copy; under s3-only the blob lives only in object storage,
# so ensure_local downloads it to a temp file we own and must clean up
# (INV-STG1 — the pipeline must read through the storage layer, never
# assume the key maps to an existing DATA_DIR file).
staged_is_tmp = storage.local_path(
staged_key, bucket=storage.Bucket.DOCUMENTS) is None
staged = await storage.ensure_local(
staged_key, bucket=storage.Bucket.DOCUMENTS)
try:
if staged is not None:
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
@@ -237,6 +250,14 @@ async def ingest_document(
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
finally:
# Drop the temp download (s3-only); on filesystem/dual ``staged`` is the
# canonical on-disk copy and must NOT be removed.
if staged_is_tmp and staged is not None:
try:
staged.unlink(missing_ok=True)
except OSError as e: # noqa: BLE001 — temp cleanup, never fatal
logger.debug("could not remove temp staged file %s: %s", staged, e)
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:

View File

@@ -30,20 +30,24 @@ def test_stage_file_lands_under_datadir(_tmp_datadir):
src.parent.mkdir(parents=True)
src.write_bytes(b"%PDF-1.4 ...")
root = _tmp_datadir / "precedent-library"
dest = run(ingest._stage_file(src, root, "court_ruling"))
# dest is under the staging subdir, prefixed with a uuid, original suffix kept
# _stage_file returns the DATA_DIR-relative storage KEY (what the DB stores);
# the caller resolves a local path via storage.ensure_local.
key = run(ingest._stage_file(src, root, "court_ruling"))
assert isinstance(key, str)
assert key.startswith("precedent-library/court_ruling/")
assert key.endswith(".pdf")
# under the filesystem backend the key maps to the exact legacy on-disk path
dest = _tmp_datadir / key
assert dest.parent == root / "court_ruling"
assert dest.exists()
assert dest.read_bytes() == b"%PDF-1.4 ..."
assert dest.suffix == ".pdf"
# and the key is DATA_DIR-relative (what the DB column will store)
assert dest.relative_to(_tmp_datadir).as_posix().startswith("precedent-library/court_ruling/")
def test_stage_file_default_subdir(_tmp_datadir):
src = _tmp_datadir / "x.docx"
src.write_bytes(b"doc")
dest = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
key = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
dest = _tmp_datadir / key
assert dest.parent == _tmp_datadir / "digests" / "other"
assert dest.exists()

View File

@@ -172,3 +172,58 @@ def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
kind, kw = patched["create"][0]
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
def test_ingest_reads_via_ensure_local_when_no_disk_copy(patched, tmp_path, monkeypatch):
"""Regression: under the s3-only backend the staged key has NO on-disk file,
so reading the DATA_DIR path directly 500'd ('Package not found at …').
ingest must resolve a readable local path via storage.ensure_local (a temp
download) and clean it up afterwards.
"""
from legal_mcp.services import storage
store: dict[str, bytes] = {}
class _MemBackend(storage.StorageBackend):
"""In-memory backend with NO local copy — mimics s3-only."""
name = "mem"
async def put_bytes(self, key, data, *, bucket=storage.Bucket.DOCUMENTS,
content_type=None, metadata=None):
store[storage.normalize_key(key)] = bytes(data)
return f"mem://{key}"
async def put_file(self, src, key, *, bucket=storage.Bucket.DOCUMENTS,
content_type=None, metadata=None):
store[storage.normalize_key(key)] = Path(src).read_bytes()
return f"mem://{key}"
async def get_bytes(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return store[storage.normalize_key(key)]
async def exists(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return storage.normalize_key(key) in store
def local_path(self, key, *, bucket=storage.Bucket.DOCUMENTS):
return None # the s3-only condition: nothing on disk
monkeypatch.setattr(storage, "_singleton", _MemBackend())
# An extract_text that actually READS the path it is handed — proves ingest
# passes a real, readable local file rather than a phantom DATA_DIR path.
seen = []
async def _extract_reads(path):
seen.append(path)
assert Path(path).read_bytes() == b"%PDF-1.4 fake"
return ("full decision text", 1, [0])
monkeypatch.setattr(extractor, "extract_text", _extract_reads)
out = _run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert out["status"] == "completed"
# the temp download was cleaned up (no /tmp leak)
assert seen and not Path(seen[0]).exists()