"""Unified object-storage layer (X14, INV-STG1). THE single choke-point for all binary file I/O — originals, derived artifacts (thumbnails / extracted text), and exports. It replaces the scattered ``open()`` / ``shutil.copy`` / ``Path.write_bytes`` calls spread across ~8 services (G2: one storage path, no parallel routes). See docs/spec/X14-storage-minio.md. Keys ---- A *key* is a DATA_DIR-relative POSIX path, e.g.:: cases/8174-24/documents/originals/.pdf precedent-library/thumbnails//p001.jpg The filesystem backend maps ``key -> DATA_DIR / key``, preserving the exact current on-disk layout (zero behaviour change when ``STORAGE_BACKEND`` is the default ``filesystem``). The S3 backend maps a logical *bucket* (documents/immutable/derived) to a MinIO bucket and uses the key verbatim as the object key. INV-STG2: keys are atomic ASCII/UUID paths; a Hebrew original filename is carried as object metadata / a DB column, never as the key itself. INV-STG5: pgvector stays the source of truth for text + embeddings — this layer stores blobs only. INV-STG6: presigned URLs (minted against the public endpoint) serve bytes straight to the browser. Backends (config.STORAGE_BACKEND) --------------------------------- - ``filesystem`` (default) — disk only; current behaviour. - ``dual`` — write disk + S3; read S3, fall back to disk. The migration window; disk stays authoritative. - ``s3`` — MinIO only. ``aioboto3`` is imported lazily so this module loads even where the dependency is absent (the default filesystem backend needs nothing extra). """ from __future__ import annotations import asyncio import logging import shutil import tempfile from enum import Enum from pathlib import Path, PurePosixPath from typing import Iterable from legal_mcp import config logger = logging.getLogger(__name__) class Bucket(str, Enum): """Logical governance buckets (INV-STG3). Resolved to MinIO bucket names via config; ignored by the filesystem backend (which keeps one tree).""" DOCUMENTS = "documents" IMMUTABLE = "immutable" DERIVED = "derived" def _bucket_name(bucket: Bucket) -> str: return { Bucket.DOCUMENTS: config.MINIO_BUCKET_DOCUMENTS, Bucket.IMMUTABLE: config.MINIO_BUCKET_IMMUTABLE, Bucket.DERIVED: config.MINIO_BUCKET_DERIVED, }[bucket] def normalize_key(key: str | Path) -> str: """Return a clean DATA_DIR-relative POSIX key. Rejects absolute paths and ``..`` traversal (defence in depth — keys are built internally, never from raw user input). An absolute path under DATA_DIR is accepted and re-relativised so call-sites can pass either a key or a full ``Path`` during the migration. """ p = Path(key) if p.is_absolute(): try: p = p.relative_to(config.DATA_DIR) except ValueError as exc: raise ValueError(f"absolute path outside DATA_DIR: {key!r}") from exc posix = PurePosixPath(p.as_posix()) parts = posix.parts if not parts or any(part == ".." for part in parts): raise ValueError(f"invalid storage key: {key!r}") return posix.as_posix().lstrip("/") def _ascii_metadata(value) -> str: """Coerce an S3 user-metadata value to ASCII. S3/MinIO object metadata must be ASCII (botocore raises ParamValidationError otherwise). The only non-ASCII value we attach is the original Hebrew filename (``ingest._stage_file`` → ``metadata={"filename": ...}``), so a Hebrew name like ``"יומון 5167 - 11.6.26.pdf"`` would 500 every s3-only upload. Percent-encode non-ASCII losslessly (recover with ``urllib.parse.unquote``) while leaving plain-ASCII values readable.""" s = str(value) if s.isascii(): return s from urllib.parse import quote return quote(s) class StorageBackend: """Abstract backend. All methods are async except the cheap path helpers.""" name = "abstract" async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: raise NotImplementedError async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: with open(src, "rb") as fh: return await self.put_bytes( key, fh.read(), bucket=bucket, content_type=content_type, metadata=metadata, ) async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes: raise NotImplementedError async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool: raise NotImplementedError async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None: raise NotImplementedError async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]: raise NotImplementedError async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, download_name=None) -> str: raise NotImplementedError async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, content_type=None) -> str: raise NotImplementedError def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None: """Return a real filesystem path if one exists *without* downloading, else ``None``. Use :meth:`ensure_local` when a path is required.""" return None async def ensure_local(self, key, *, bucket=Bucket.DOCUMENTS) -> Path: """Return a local path to the object, downloading to a temp file if the backend has no on-disk copy. Caller owns cleanup of temp files.""" path = self.local_path(key, bucket=bucket) if path is not None: return path data = await self.get_bytes(key, bucket=bucket) suffix = PurePosixPath(normalize_key(key)).suffix tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tmp.write(data) tmp.close() return Path(tmp.name) class FilesystemBackend(StorageBackend): """Disk under DATA_DIR. ``bucket`` is ignored — the existing single tree is preserved verbatim, so the default backend is byte-for-byte the legacy behaviour.""" name = "filesystem" def _abs(self, key, *, bucket=Bucket.DOCUMENTS) -> Path: rel = normalize_key(key) path = (Path(config.DATA_DIR) / rel).resolve() root = Path(config.DATA_DIR).resolve() if root not in path.parents and path != root: raise ValueError(f"resolved path escapes DATA_DIR: {key!r}") return path async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: path = self._abs(key, bucket=bucket) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(data) return f"file://{path}" async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: path = self._abs(key, bucket=bucket) path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, path) # preserve mtime, as the legacy code did return f"file://{path}" async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes: return self._abs(key, bucket=bucket).read_bytes() async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool: return self._abs(key, bucket=bucket).exists() async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None: self._abs(key, bucket=bucket).unlink(missing_ok=True) async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]: root = Path(config.DATA_DIR).resolve() base = self._abs(prefix, bucket=bucket) if prefix else root if not base.exists(): return [] out: list[str] = [] for p in sorted(base.rglob("*")): if p.is_file(): out.append(p.resolve().relative_to(root).as_posix()) return out def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None: path = self._abs(key, bucket=bucket) return path if path.exists() else None async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, download_name=None) -> str: raise NotImplementedError( "presigned URLs require the S3 backend (set STORAGE_BACKEND=dual|s3)" ) async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, content_type=None) -> str: raise NotImplementedError( "presigned URLs require the S3 backend (set STORAGE_BACKEND=dual|s3)" ) class S3Backend(StorageBackend): """MinIO via aioboto3. Server-side ops use the internal endpoint; presigned URLs are minted against the public endpoint so the browser can reach them (INV-STG6).""" name = "s3" def __init__(self) -> None: self._session = None # lazy aioboto3 session def _boto(self): import aioboto3 # lazy — absent in the default filesystem path from botocore.config import Config as BotoConfig if self._session is None: self._session = aioboto3.Session() cfg = BotoConfig(signature_version="s3v4", s3={"addressing_style": "path"}) return aioboto3, BotoConfig, cfg def _client(self, *, public: bool = False): _aioboto3, _BotoConfig, cfg = self._boto() endpoint = config.MINIO_PUBLIC_ENDPOINT if public else config.MINIO_ENDPOINT return self._session.client( "s3", endpoint_url=endpoint, aws_access_key_id=config.MINIO_ACCESS_KEY, aws_secret_access_key=config.MINIO_SECRET_KEY, region_name=config.MINIO_REGION, config=cfg, ) async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: k = normalize_key(key) extra = {} if content_type: extra["ContentType"] = content_type if metadata: extra["Metadata"] = {kk: _ascii_metadata(vv) for kk, vv in metadata.items()} async with self._client() as s3: await s3.put_object(Bucket=_bucket_name(bucket), Key=k, Body=data, **extra) return f"s3://{_bucket_name(bucket)}/{k}" async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes: k = normalize_key(key) async with self._client() as s3: resp = await s3.get_object(Bucket=_bucket_name(bucket), Key=k) async with resp["Body"] as stream: return await stream.read() async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool: from botocore.exceptions import ClientError k = normalize_key(key) async with self._client() as s3: try: await s3.head_object(Bucket=_bucket_name(bucket), Key=k) return True except ClientError as exc: if exc.response["Error"]["Code"] in ("404", "NoSuchKey", "NotFound"): return False raise async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None: k = normalize_key(key) async with self._client() as s3: await s3.delete_object(Bucket=_bucket_name(bucket), Key=k) async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]: pfx = normalize_key(prefix) if prefix else "" out: list[str] = [] async with self._client() as s3: paginator = s3.get_paginator("list_objects_v2") async for page in paginator.paginate(Bucket=_bucket_name(bucket), Prefix=pfx): for obj in page.get("Contents", []): out.append(obj["Key"]) return out async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, download_name=None) -> str: k = normalize_key(key) params = {"Bucket": _bucket_name(bucket), "Key": k} if download_name: # RFC 5987 — keep the Hebrew original filename on download (INV-STG2) from urllib.parse import quote params["ResponseContentDisposition"] = ( f"attachment; filename*=UTF-8''{quote(download_name)}" ) async with self._client(public=True) as s3: return await s3.generate_presigned_url( "get_object", Params=params, ExpiresIn=ttl or config.MINIO_PRESIGN_TTL, ) async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, content_type=None) -> str: k = normalize_key(key) params = {"Bucket": _bucket_name(bucket), "Key": k} if content_type: params["ContentType"] = content_type async with self._client(public=True) as s3: return await s3.generate_presigned_url( "put_object", Params=params, ExpiresIn=ttl or config.MINIO_PRESIGN_TTL, ) class DualBackend(StorageBackend): """Migration window: writes go to BOTH disk and S3 (disk authoritative); reads prefer S3 and fall back to disk. An S3 write failure is logged (never swallowed — engineering §6) but does not break the app while disk holds the canonical copy.""" name = "dual" def __init__(self) -> None: self.fs = FilesystemBackend() self.s3 = S3Backend() async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: uri = await self.fs.put_bytes(key, data, bucket=bucket, content_type=content_type, metadata=metadata) try: await self.s3.put_bytes(key, data, bucket=bucket, content_type=content_type, metadata=metadata) except Exception as exc: # noqa: BLE001 — log, don't swallow logger.warning("dual put_bytes: S3 mirror failed for %s: %s", key, exc) return uri async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: uri = await self.fs.put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata) try: await self.s3.put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata) except Exception as exc: # noqa: BLE001 logger.warning("dual put_file: S3 mirror failed for %s: %s", key, exc) return uri async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes: try: return await self.s3.get_bytes(key, bucket=bucket) except Exception as exc: # noqa: BLE001 — fall back to disk logger.debug("dual get_bytes: S3 miss for %s (%s); using disk", key, exc) return await self.fs.get_bytes(key, bucket=bucket) async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool: if await self.fs.exists(key, bucket=bucket): return True try: return await self.s3.exists(key, bucket=bucket) except Exception: # noqa: BLE001 return False async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None: await self.fs.delete(key, bucket=bucket) try: await self.s3.delete(key, bucket=bucket) except Exception as exc: # noqa: BLE001 logger.warning("dual delete: S3 delete failed for %s: %s", key, exc) async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]: return await self.fs.list_keys(prefix, bucket=bucket) def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None: return self.fs.local_path(key, bucket=bucket) async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, download_name=None) -> str: return await self.s3.presign_get(key, bucket=bucket, ttl=ttl, download_name=download_name) async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None, content_type=None) -> str: return await self.s3.presign_put(key, bucket=bucket, ttl=ttl, content_type=content_type) _BACKENDS = { "filesystem": FilesystemBackend, "dual": DualBackend, "s3": S3Backend, } _singleton: StorageBackend | None = None def get_storage() -> StorageBackend: """Return the process-wide storage backend selected by config.STORAGE_BACKEND (cached). Unknown values fall back to ``filesystem`` with a warning rather than crashing the app.""" global _singleton if _singleton is None: cls = _BACKENDS.get(config.STORAGE_BACKEND) if cls is None: logger.warning( "unknown STORAGE_BACKEND=%r — falling back to filesystem", config.STORAGE_BACKEND, ) cls = FilesystemBackend _singleton = cls() logger.info("storage backend = %s", _singleton.name) return _singleton def reset_storage_cache() -> None: """Drop the cached backend (tests / after an env change).""" global _singleton _singleton = None # ── module-level convenience wrappers ────────────────────────────── # Thin pass-throughs so call-sites can ``from legal_mcp.services import storage`` # and use ``await storage.put_bytes(...)`` without fetching the singleton. async def put_bytes(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: return await get_storage().put_bytes( key, data, bucket=bucket, content_type=content_type, metadata=metadata) async def put_file(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: return await get_storage().put_file( src, key, bucket=bucket, content_type=content_type, metadata=metadata) async def get_bytes(key, *, bucket=Bucket.DOCUMENTS) -> bytes: return await get_storage().get_bytes(key, bucket=bucket) async def exists(key, *, bucket=Bucket.DOCUMENTS) -> bool: return await get_storage().exists(key, bucket=bucket) async def delete(key, *, bucket=Bucket.DOCUMENTS) -> None: return await get_storage().delete(key, bucket=bucket) async def list_keys(prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]: return await get_storage().list_keys(prefix, bucket=bucket) async def presign_get(key, *, bucket=Bucket.DOCUMENTS, ttl=None, download_name=None) -> str: return await get_storage().presign_get( key, bucket=bucket, ttl=ttl, download_name=download_name) async def presign_put(key, *, bucket=Bucket.DOCUMENTS, ttl=None, content_type=None) -> str: return await get_storage().presign_put( key, bucket=bucket, ttl=ttl, content_type=content_type) def local_path(key, *, bucket=Bucket.DOCUMENTS) -> Path | None: return get_storage().local_path(key, bucket=bucket) async def ensure_local(key, *, bucket=Bucket.DOCUMENTS) -> Path: return await get_storage().ensure_local(key, bucket=bucket) # ── mirror: dual-write seal for the not-yet-read-wired pipeline (INV-STG1) ────── # A handful of upload/finalize paths still keep a copy on disk because the # ingest/extract pipeline reads files by their DATA_DIR path (not yet wired to # ensure_local). For those, ``mirror``/``mirror_file`` ALSO persist the blob to # object storage when the active backend is s3/dual — so no blob is ever missing # from MinIO (durability + presigned serving) even though a disk copy lingers # for the pipeline. No-op under the filesystem backend (the disk write is the # canonical copy). Best-effort: an S3 failure is logged, never breaks the # request (the disk copy holds). The full fix (read-wire the pipeline → drop the # disk copy) is tracked separately; until then this closes the data-loss leak. async def mirror(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> None: backend = get_storage() if backend.name == "filesystem": return s3 = getattr(backend, "s3", backend) try: await s3.put_bytes(key, data, bucket=bucket, content_type=content_type, metadata=metadata) except Exception as exc: # noqa: BLE001 — log, never break the request logger.warning("storage.mirror: S3 persist failed for %s: %s", key, exc) async def mirror_file(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> None: backend = get_storage() if backend.name == "filesystem": return s3 = getattr(backend, "s3", backend) try: await s3.put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata) except Exception as exc: # noqa: BLE001 logger.warning("storage.mirror_file: S3 persist failed for %s: %s", key, exc) # ── synchronous facade ───────────────────────────────────────────── # A few legacy writers are plain sync functions (track-changes save, retrofit # backup, the multimodal thumbnail renderer which runs in a worker thread via # asyncio.to_thread). They go through the same layer via this blocking shim so # INV-STG1 holds everywhere. def _run_coro_blocking(coro): """Run a storage coroutine to completion from synchronous code. No running loop in this thread (the common case — sync helpers, or a to_thread worker) → asyncio.run. If a loop *is* already running here, the coroutine is offloaded to a fresh thread so we never deadlock the loop.""" try: asyncio.get_running_loop() except RuntimeError: return asyncio.run(coro) box: dict = {} def _worker(): box["value"] = asyncio.run(coro) import threading t = threading.Thread(target=_worker) t.start() t.join() return box["value"] def put_bytes_sync(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: return _run_coro_blocking( put_bytes(key, data, bucket=bucket, content_type=content_type, metadata=metadata)) def put_file_sync(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: return _run_coro_blocking( put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata)) def mirror_sync(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> None: _run_coro_blocking(mirror(key, data, bucket=bucket, content_type=content_type, metadata=metadata)) def mirror_file_sync(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> None: _run_coro_blocking(mirror_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata))