Merge pull request 'feat(storage): X14 Phase 2a — route source-document writes through storage.py' (#152) from worktree-storage-minio-phase2 into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m30s

This commit was merged in pull request #152.
This commit is contained in:
2026-06-08 08:01:00 +00:00
4 changed files with 82 additions and 18 deletions

View File

@@ -144,7 +144,7 @@ async def create_pending_digest(
raise ValueError(f"file not found: {file_path}") raise ValueError(f"file not found: {file_path}")
await progress("staging", 10, "מעתיק קובץ") await progress("staging", 10, "מעתיק קובץ")
staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") staged = await ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
rel_path = str(staged.relative_to(config.DATA_DIR)) \ rel_path = str(staged.relative_to(config.DATA_DIR)) \
if str(staged).startswith(str(config.DATA_DIR)) else str(staged) if str(staged).startswith(str(config.DATA_DIR)) else str(staged)

View File

@@ -14,8 +14,8 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import mimetypes
import re import re
import shutil
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date from datetime import date
from pathlib import Path from pathlib import Path
@@ -23,7 +23,7 @@ from typing import Awaitable, Callable
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from legal_mcp import config from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor from legal_mcp.services import chunker, db, embeddings, extractor, storage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -66,11 +66,20 @@ def _safe_filename(name: str) -> str:
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
dest_dir = root / (subdir or "other") """Stage an intake file through the unified storage layer (INV-STG1).
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" Returns the DATA_DIR path the rest of the pipeline reads from — under the
shutil.copy2(src_path, dest) filesystem/dual backends the bytes are on disk and the key is the
DATA_DIR-relative path. The Hebrew original filename rides as object
metadata, never as the key (INV-STG2)."""
dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
key = dest.relative_to(config.DATA_DIR).as_posix()
await storage.put_file(
src_path, key, bucket=storage.Bucket.DOCUMENTS,
content_type=mimetypes.guess_type(src_path.name)[0],
metadata={"filename": src_path.name},
)
return dest return dest
@@ -151,7 +160,7 @@ async def ingest_document(
if not src.is_file(): if not src.is_file():
raise FileNotFoundError(f"file not found: {src}") raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון") await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ") await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try: try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))

View File

@@ -4,12 +4,12 @@ from __future__ import annotations
import hashlib import hashlib
import json import json
import shutil import mimetypes
from pathlib import Path from pathlib import Path
from uuid import UUID from uuid import UUID
from legal_mcp import config from legal_mcp import config
from legal_mcp.services import audit, db, git_sync, processor from legal_mcp.services import audit, db, git_sync, processor, storage
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
@@ -50,11 +50,14 @@ async def document_upload(
"idempotent_existing": True, "idempotent_existing": True,
}, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.") }, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.")
# Copy file to case directory # Stage the original through the unified storage layer (INV-STG1).
case_dir = config.find_case_dir(case_number) / "documents" / "originals" dest = config.find_case_dir(case_number) / "documents" / "originals" / source.name
case_dir.mkdir(parents=True, exist_ok=True) await storage.put_file(
dest = case_dir / source.name source, dest.relative_to(config.DATA_DIR).as_posix(),
shutil.copy2(str(source), str(dest)) bucket=storage.Bucket.DOCUMENTS,
content_type=mimetypes.guess_type(source.name)[0],
metadata={"filename": source.name},
)
# For auto classification, start with "reference" — will be updated after processing # For auto classification, start with "reference" — will be updated after processing
initial_doc_type = doc_type if doc_type != "auto" else "reference" initial_doc_type = doc_type if doc_type != "auto" else "reference"
@@ -156,10 +159,14 @@ async def document_upload_training(
} }
subdir = _SUBTYPE_DIRS.get(appeal_subtype, "") subdir = _SUBTYPE_DIRS.get(appeal_subtype, "")
training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR
training_dest.mkdir(parents=True, exist_ok=True)
dest = training_dest / source.name dest = training_dest / source.name
if source.resolve() != dest.resolve(): if source.resolve() != dest.resolve():
shutil.copy2(str(source), str(dest)) await storage.put_file(
source, dest.relative_to(config.DATA_DIR).as_posix(),
bucket=storage.Bucket.DOCUMENTS,
content_type=mimetypes.guess_type(source.name)[0],
metadata={"filename": source.name},
)
# Extract text and strip Nevo preamble # Extract text and strip Nevo preamble
text, page_count, _ = await extractor.extract_text(str(dest)) text, page_count, _ = await extractor.extract_text(str(dest))

View File

@@ -0,0 +1,48 @@
"""Regression tests for the write call-sites rewired onto storage.py (X14
Phase 2). They assert the rewired staging lands bytes at the exact legacy
on-disk location under the default filesystem backend — i.e. zero behaviour
change.
"""
import asyncio
from pathlib import Path
import pytest
from legal_mcp import config
from legal_mcp.services import ingest, storage
@pytest.fixture(autouse=True)
def _tmp_datadir(tmp_path, monkeypatch):
monkeypatch.setattr(config, "DATA_DIR", tmp_path)
monkeypatch.setattr(config, "STORAGE_BACKEND", "filesystem")
storage.reset_storage_cache()
yield tmp_path
storage.reset_storage_cache()
def run(coro):
return asyncio.run(coro)
def test_stage_file_lands_under_datadir(_tmp_datadir):
src = _tmp_datadir / "src" / "כתב ערר.pdf"
src.parent.mkdir(parents=True)
src.write_bytes(b"%PDF-1.4 ...")
root = _tmp_datadir / "precedent-library"
dest = run(ingest._stage_file(src, root, "court_ruling"))
# dest is under the staging subdir, prefixed with a uuid, original suffix kept
assert dest.parent == root / "court_ruling"
assert dest.exists()
assert dest.read_bytes() == b"%PDF-1.4 ..."
assert dest.suffix == ".pdf"
# and the key is DATA_DIR-relative (what the DB column will store)
assert dest.relative_to(_tmp_datadir).as_posix().startswith("precedent-library/court_ruling/")
def test_stage_file_default_subdir(_tmp_datadir):
src = _tmp_datadir / "x.docx"
src.write_bytes(b"doc")
dest = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
assert dest.parent == _tmp_datadir / "digests" / "other"
assert dest.exists()