feat(storage): X14 Phase 2a — route source-document writes through storage.py
Rewire the source-document staging writes onto the unified storage layer (INV-STG1), replacing direct shutil.copy2 calls: - tools/documents.py: case originals + training-corpus uploads - services/ingest.py: _stage_file (now async) — covers precedent-library, internal-decisions, and digests (the canonical intake helper) - services/digest_library.py: awaits the now-async _stage_file Each write goes through storage.put_file(..., bucket=DOCUMENTS) with the DATA_DIR-relative key; the Hebrew original filename rides as object metadata (INV-STG2), content-type is guessed from the extension. DB path columns are unchanged (still the absolute dest) — object_key backfill is Phase 3. Under the default STORAGE_BACKEND=filesystem the bytes land at the exact legacy on-disk location (put_file → shutil.copy2 to DATA_DIR/key), so this is zero behaviour change in prod. shutil import dropped where now unused. tests: +2 staging regression tests (file lands under DATA_DIR at the legacy path); 20 storage + 22 ingest tests green; 242 collected with no import breakage. Derived/export write sites (thumbnails, extracted text, DOCX exports) are Phase 2b. Keeps G2; advances INV-STG1. Spec: docs/spec/X14-storage-minio.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -144,7 +144,7 @@ async def create_pending_digest(
|
|||||||
raise ValueError(f"file not found: {file_path}")
|
raise ValueError(f"file not found: {file_path}")
|
||||||
|
|
||||||
await progress("staging", 10, "מעתיק קובץ")
|
await progress("staging", 10, "מעתיק קובץ")
|
||||||
staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
|
staged = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
|
||||||
rel_path = str(staged.relative_to(config.DATA_DIR)) \
|
rel_path = str(staged.relative_to(config.DATA_DIR)) \
|
||||||
if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
|
if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
|
||||||
|
|
||||||
|
|||||||
@@ -14,8 +14,8 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import mimetypes
|
||||||
import re
|
import re
|
||||||
import shutil
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -23,7 +23,7 @@ from typing import Awaitable, Callable
|
|||||||
from uuid import UUID, uuid4
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
from legal_mcp import config
|
from legal_mcp import config
|
||||||
from legal_mcp.services import chunker, db, embeddings, extractor
|
from legal_mcp.services import chunker, db, embeddings, extractor, storage
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -66,11 +66,20 @@ def _safe_filename(name: str) -> str:
|
|||||||
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
||||||
|
|
||||||
|
|
||||||
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
||||||
dest_dir = root / (subdir or "other")
|
"""Stage an intake file through the unified storage layer (INV-STG1).
|
||||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
Returns the DATA_DIR path the rest of the pipeline reads from — under the
|
||||||
shutil.copy2(src_path, dest)
|
filesystem/dual backends the bytes are on disk and the key is the
|
||||||
|
DATA_DIR-relative path. The Hebrew original filename rides as object
|
||||||
|
metadata, never as the key (INV-STG2)."""
|
||||||
|
dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
||||||
|
key = dest.relative_to(config.DATA_DIR).as_posix()
|
||||||
|
await storage.put_file(
|
||||||
|
src_path, key, bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type=mimetypes.guess_type(src_path.name)[0],
|
||||||
|
metadata={"filename": src_path.name},
|
||||||
|
)
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
|
|
||||||
@@ -151,7 +160,7 @@ async def ingest_document(
|
|||||||
if not src.is_file():
|
if not src.is_file():
|
||||||
raise FileNotFoundError(f"file not found: {src}")
|
raise FileNotFoundError(f"file not found: {src}")
|
||||||
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
||||||
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
||||||
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
||||||
try:
|
try:
|
||||||
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
||||||
|
|||||||
@@ -4,12 +4,12 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import shutil
|
import mimetypes
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from legal_mcp import config
|
from legal_mcp import config
|
||||||
from legal_mcp.services import audit, db, git_sync, processor
|
from legal_mcp.services import audit, db, git_sync, processor, storage
|
||||||
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
|
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
|
||||||
|
|
||||||
|
|
||||||
@@ -50,11 +50,14 @@ async def document_upload(
|
|||||||
"idempotent_existing": True,
|
"idempotent_existing": True,
|
||||||
}, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.")
|
}, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.")
|
||||||
|
|
||||||
# Copy file to case directory
|
# Stage the original through the unified storage layer (INV-STG1).
|
||||||
case_dir = config.find_case_dir(case_number) / "documents" / "originals"
|
dest = config.find_case_dir(case_number) / "documents" / "originals" / source.name
|
||||||
case_dir.mkdir(parents=True, exist_ok=True)
|
await storage.put_file(
|
||||||
dest = case_dir / source.name
|
source, dest.relative_to(config.DATA_DIR).as_posix(),
|
||||||
shutil.copy2(str(source), str(dest))
|
bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type=mimetypes.guess_type(source.name)[0],
|
||||||
|
metadata={"filename": source.name},
|
||||||
|
)
|
||||||
|
|
||||||
# For auto classification, start with "reference" — will be updated after processing
|
# For auto classification, start with "reference" — will be updated after processing
|
||||||
initial_doc_type = doc_type if doc_type != "auto" else "reference"
|
initial_doc_type = doc_type if doc_type != "auto" else "reference"
|
||||||
@@ -156,10 +159,14 @@ async def document_upload_training(
|
|||||||
}
|
}
|
||||||
subdir = _SUBTYPE_DIRS.get(appeal_subtype, "")
|
subdir = _SUBTYPE_DIRS.get(appeal_subtype, "")
|
||||||
training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR
|
training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR
|
||||||
training_dest.mkdir(parents=True, exist_ok=True)
|
|
||||||
dest = training_dest / source.name
|
dest = training_dest / source.name
|
||||||
if source.resolve() != dest.resolve():
|
if source.resolve() != dest.resolve():
|
||||||
shutil.copy2(str(source), str(dest))
|
await storage.put_file(
|
||||||
|
source, dest.relative_to(config.DATA_DIR).as_posix(),
|
||||||
|
bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type=mimetypes.guess_type(source.name)[0],
|
||||||
|
metadata={"filename": source.name},
|
||||||
|
)
|
||||||
|
|
||||||
# Extract text and strip Nevo preamble
|
# Extract text and strip Nevo preamble
|
||||||
text, page_count, _ = await extractor.extract_text(str(dest))
|
text, page_count, _ = await extractor.extract_text(str(dest))
|
||||||
|
|||||||
48
mcp-server/tests/test_storage_staging.py
Normal file
48
mcp-server/tests/test_storage_staging.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
"""Regression tests for the write call-sites rewired onto storage.py (X14
|
||||||
|
Phase 2). They assert the rewired staging lands bytes at the exact legacy
|
||||||
|
on-disk location under the default filesystem backend — i.e. zero behaviour
|
||||||
|
change.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from legal_mcp import config
|
||||||
|
from legal_mcp.services import ingest, storage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _tmp_datadir(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr(config, "DATA_DIR", tmp_path)
|
||||||
|
monkeypatch.setattr(config, "STORAGE_BACKEND", "filesystem")
|
||||||
|
storage.reset_storage_cache()
|
||||||
|
yield tmp_path
|
||||||
|
storage.reset_storage_cache()
|
||||||
|
|
||||||
|
|
||||||
|
def run(coro):
|
||||||
|
return asyncio.run(coro)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stage_file_lands_under_datadir(_tmp_datadir):
|
||||||
|
src = _tmp_datadir / "src" / "כתב ערר.pdf"
|
||||||
|
src.parent.mkdir(parents=True)
|
||||||
|
src.write_bytes(b"%PDF-1.4 ...")
|
||||||
|
root = _tmp_datadir / "precedent-library"
|
||||||
|
dest = run(ingest._stage_file(src, root, "court_ruling"))
|
||||||
|
# dest is under the staging subdir, prefixed with a uuid, original suffix kept
|
||||||
|
assert dest.parent == root / "court_ruling"
|
||||||
|
assert dest.exists()
|
||||||
|
assert dest.read_bytes() == b"%PDF-1.4 ..."
|
||||||
|
assert dest.suffix == ".pdf"
|
||||||
|
# and the key is DATA_DIR-relative (what the DB column will store)
|
||||||
|
assert dest.relative_to(_tmp_datadir).as_posix().startswith("precedent-library/court_ruling/")
|
||||||
|
|
||||||
|
|
||||||
|
def test_stage_file_default_subdir(_tmp_datadir):
|
||||||
|
src = _tmp_datadir / "x.docx"
|
||||||
|
src.write_bytes(b"doc")
|
||||||
|
dest = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
|
||||||
|
assert dest.parent == _tmp_datadir / "digests" / "other"
|
||||||
|
assert dest.exists()
|
||||||
Reference in New Issue
Block a user