From 1986fe3b14169db9f8e82e8540ae618c2aca15b2 Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 08:00:27 +0000 Subject: [PATCH] =?UTF-8?q?feat(storage):=20X14=20Phase=202a=20=E2=80=94?= =?UTF-8?q?=20route=20source-document=20writes=20through=20storage.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewire the source-document staging writes onto the unified storage layer (INV-STG1), replacing direct shutil.copy2 calls: - tools/documents.py: case originals + training-corpus uploads - services/ingest.py: _stage_file (now async) — covers precedent-library, internal-decisions, and digests (the canonical intake helper) - services/digest_library.py: awaits the now-async _stage_file Each write goes through storage.put_file(..., bucket=DOCUMENTS) with the DATA_DIR-relative key; the Hebrew original filename rides as object metadata (INV-STG2), content-type is guessed from the extension. DB path columns are unchanged (still the absolute dest) — object_key backfill is Phase 3. Under the default STORAGE_BACKEND=filesystem the bytes land at the exact legacy on-disk location (put_file → shutil.copy2 to DATA_DIR/key), so this is zero behaviour change in prod. shutil import dropped where now unused. tests: +2 staging regression tests (file lands under DATA_DIR at the legacy path); 20 storage + 22 ingest tests green; 242 collected with no import breakage. Derived/export write sites (thumbnails, extracted text, DOCX exports) are Phase 2b. Keeps G2; advances INV-STG1. Spec: docs/spec/X14-storage-minio.md. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/legal_mcp/services/digest_library.py | 2 +- mcp-server/src/legal_mcp/services/ingest.py | 25 ++++++---- mcp-server/src/legal_mcp/tools/documents.py | 25 ++++++---- mcp-server/tests/test_storage_staging.py | 48 +++++++++++++++++++ 4 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 mcp-server/tests/test_storage_staging.py diff --git a/mcp-server/src/legal_mcp/services/digest_library.py b/mcp-server/src/legal_mcp/services/digest_library.py index e2e4793..6371139 100644 --- a/mcp-server/src/legal_mcp/services/digest_library.py +++ b/mcp-server/src/legal_mcp/services/digest_library.py @@ -144,7 +144,7 @@ async def create_pending_digest( raise ValueError(f"file not found: {file_path}") await progress("staging", 10, "מעתיק קובץ") - staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") + staged = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") rel_path = str(staged.relative_to(config.DATA_DIR)) \ if str(staged).startswith(str(config.DATA_DIR)) else str(staged) diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py index 882cc1b..fabd9b6 100644 --- a/mcp-server/src/legal_mcp/services/ingest.py +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -14,8 +14,8 @@ from __future__ import annotations import asyncio import logging +import mimetypes import re -import shutil from dataclasses import dataclass from datetime import date from pathlib import Path @@ -23,7 +23,7 @@ from typing import Awaitable, Callable from uuid import UUID, uuid4 from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor +from legal_mcp.services import chunker, db, embeddings, extractor, storage logger = logging.getLogger(__name__) @@ -66,11 +66,20 @@ def _safe_filename(name: str) -> str: return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" -def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: - dest_dir = root / (subdir or "other") - dest_dir.mkdir(parents=True, exist_ok=True) - dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" - shutil.copy2(src_path, dest) +async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: + """Stage an intake file through the unified storage layer (INV-STG1). + + Returns the DATA_DIR path the rest of the pipeline reads from — under the + filesystem/dual backends the bytes are on disk and the key is the + DATA_DIR-relative path. The Hebrew original filename rides as object + metadata, never as the key (INV-STG2).""" + dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" + key = dest.relative_to(config.DATA_DIR).as_posix() + await storage.put_file( + src_path, key, bucket=storage.Bucket.DOCUMENTS, + content_type=mimetypes.guess_type(src_path.name)[0], + metadata={"filename": src_path.name}, + ) return dest @@ -151,7 +160,7 @@ async def ingest_document( if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") await progress("staging", 5, "מעתיק את הקובץ לאחסון") - staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) diff --git a/mcp-server/src/legal_mcp/tools/documents.py b/mcp-server/src/legal_mcp/tools/documents.py index 39aafac..b02c4cc 100644 --- a/mcp-server/src/legal_mcp/tools/documents.py +++ b/mcp-server/src/legal_mcp/tools/documents.py @@ -4,12 +4,12 @@ from __future__ import annotations import hashlib import json -import shutil +import mimetypes from pathlib import Path from uuid import UUID from legal_mcp import config -from legal_mcp.services import audit, db, git_sync, processor +from legal_mcp.services import audit, db, git_sync, processor, storage from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope @@ -50,11 +50,14 @@ async def document_upload( "idempotent_existing": True, }, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.") - # Copy file to case directory - case_dir = config.find_case_dir(case_number) / "documents" / "originals" - case_dir.mkdir(parents=True, exist_ok=True) - dest = case_dir / source.name - shutil.copy2(str(source), str(dest)) + # Stage the original through the unified storage layer (INV-STG1). + dest = config.find_case_dir(case_number) / "documents" / "originals" / source.name + await storage.put_file( + source, dest.relative_to(config.DATA_DIR).as_posix(), + bucket=storage.Bucket.DOCUMENTS, + content_type=mimetypes.guess_type(source.name)[0], + metadata={"filename": source.name}, + ) # For auto classification, start with "reference" — will be updated after processing initial_doc_type = doc_type if doc_type != "auto" else "reference" @@ -156,10 +159,14 @@ async def document_upload_training( } subdir = _SUBTYPE_DIRS.get(appeal_subtype, "") training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR - training_dest.mkdir(parents=True, exist_ok=True) dest = training_dest / source.name if source.resolve() != dest.resolve(): - shutil.copy2(str(source), str(dest)) + await storage.put_file( + source, dest.relative_to(config.DATA_DIR).as_posix(), + bucket=storage.Bucket.DOCUMENTS, + content_type=mimetypes.guess_type(source.name)[0], + metadata={"filename": source.name}, + ) # Extract text and strip Nevo preamble text, page_count, _ = await extractor.extract_text(str(dest)) diff --git a/mcp-server/tests/test_storage_staging.py b/mcp-server/tests/test_storage_staging.py new file mode 100644 index 0000000..9ce6d2b --- /dev/null +++ b/mcp-server/tests/test_storage_staging.py @@ -0,0 +1,48 @@ +"""Regression tests for the write call-sites rewired onto storage.py (X14 +Phase 2). They assert the rewired staging lands bytes at the exact legacy +on-disk location under the default filesystem backend — i.e. zero behaviour +change. +""" +import asyncio +from pathlib import Path + +import pytest + +from legal_mcp import config +from legal_mcp.services import ingest, storage + + +@pytest.fixture(autouse=True) +def _tmp_datadir(tmp_path, monkeypatch): + monkeypatch.setattr(config, "DATA_DIR", tmp_path) + monkeypatch.setattr(config, "STORAGE_BACKEND", "filesystem") + storage.reset_storage_cache() + yield tmp_path + storage.reset_storage_cache() + + +def run(coro): + return asyncio.run(coro) + + +def test_stage_file_lands_under_datadir(_tmp_datadir): + src = _tmp_datadir / "src" / "כתב ערר.pdf" + src.parent.mkdir(parents=True) + src.write_bytes(b"%PDF-1.4 ...") + root = _tmp_datadir / "precedent-library" + dest = run(ingest._stage_file(src, root, "court_ruling")) + # dest is under the staging subdir, prefixed with a uuid, original suffix kept + assert dest.parent == root / "court_ruling" + assert dest.exists() + assert dest.read_bytes() == b"%PDF-1.4 ..." + assert dest.suffix == ".pdf" + # and the key is DATA_DIR-relative (what the DB column will store) + assert dest.relative_to(_tmp_datadir).as_posix().startswith("precedent-library/court_ruling/") + + +def test_stage_file_default_subdir(_tmp_datadir): + src = _tmp_datadir / "x.docx" + src.write_bytes(b"doc") + dest = run(ingest._stage_file(src, _tmp_datadir / "digests", "")) + assert dest.parent == _tmp_datadir / "digests" / "other" + assert dest.exists() -- 2.49.1