feat(storage): X14 Phase 1 — unified storage layer (services/storage.py)

The single choke-point for all binary file I/O (originals, derived
artifacts, exports), replacing the scattered open()/shutil/Path.write_bytes
calls across ~8 services. Backend chosen by STORAGE_BACKEND:
- filesystem (default): disk under DATA_DIR — byte-for-byte legacy behaviour
- dual: write disk + S3, read S3→disk fallback (migration window)
- s3: MinIO via aioboto3 (lazy import; absent in the filesystem path)

Keys are DATA_DIR-relative POSIX paths; the FS backend ignores the logical
bucket and keeps the existing single tree, so the default backend is zero
behaviour change. S3 maps a governance bucket (documents/immutable/derived)
→ MinIO bucket; presigned URLs are minted against the public endpoint
(browser-reachable) and carry the Hebrew filename via RFC-5987
Content-Disposition.

- config: STORAGE_BACKEND + MINIO_* (endpoint, public-endpoint, creds,
  region, 3 bucket names, presign TTL)
- mcp_env_catalog: new "storage" category + 10 specs (X10/INV-ENV1)
- pyproject: aioboto3>=13 (consumed here, deployed with first use)
- tests: 18 unit tests (FS round-trip, key normalization/traversal guard,
  bucket resolution, backend selection, dual write-both + S3-down fallback)

No call-sites are rewired yet — that is Phase 2 (106.3). STORAGE_BACKEND
stays filesystem in prod, so behaviour is unchanged.

Invariants: keeps G2 (one storage path replaces scattered I/O); establishes
INV-STG1 (single layer), INV-STG2 (atomic keys, Hebrew name in metadata),
INV-STG3 (governance buckets), INV-STG6 (presigned serving).
Spec: docs/spec/X14-storage-minio.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 07:47:49 +00:00
parent ade22ca871
commit b4a28f072d
5 changed files with 751 additions and 1 deletions

View File

@@ -202,6 +202,32 @@ EXPORTS_DIR = DATA_DIR / "exports" # legacy exports only
# Cases directory — flat structure: data/cases/{case_number}/
CASES_DIR = DATA_DIR / "cases"
# ── Object storage (X14 / MinIO) ───────────────────────────────────
# Single storage layer (services/storage.py) replaces the scattered file
# I/O across ~8 services (INV-STG1 / G2). Backend selector:
# "filesystem" (default) — disk under DATA_DIR; current behaviour, no change.
# "dual" — write disk + S3, read S3→disk fallback (migration).
# "s3" — MinIO only.
# See docs/spec/X14-storage-minio.md.
STORAGE_BACKEND = os.environ.get("STORAGE_BACKEND", "filesystem").strip().lower()
# Endpoint reached server-side (internal Docker network: http://minio:9000).
MINIO_ENDPOINT = os.environ.get("MINIO_ENDPOINT", "http://minio:9000")
# Public endpoint used when MINTING presigned URLs for the browser (INV-STG6) —
# the browser cannot resolve the internal hostname. Falls back to the internal
# endpoint when unset (e.g. local dev).
MINIO_PUBLIC_ENDPOINT = os.environ.get("MINIO_PUBLIC_ENDPOINT", MINIO_ENDPOINT)
MINIO_ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY", "")
MINIO_SECRET_KEY = os.environ.get("MINIO_SECRET_KEY", "")
MINIO_REGION = os.environ.get("MINIO_REGION", "us-east-1")
# Logical bucket → name. Governance boundaries (INV-STG3): documents
# (versioned), immutable (versioned + Object-Lock COMPLIANCE for final
# decisions, INV-STG4), derived (thumbnails/extracted text — regenerable).
MINIO_BUCKET_DOCUMENTS = os.environ.get("MINIO_BUCKET_DOCUMENTS", "legal-documents")
MINIO_BUCKET_IMMUTABLE = os.environ.get("MINIO_BUCKET_IMMUTABLE", "legal-immutable")
MINIO_BUCKET_DERIVED = os.environ.get("MINIO_BUCKET_DERIVED", "legal-derived")
# Default presigned-URL TTL (seconds). SigV4 hard max is 7 days; keep short.
MINIO_PRESIGN_TTL = int(os.environ.get("MINIO_PRESIGN_TTL", "900"))
def find_case_dir(case_number: str) -> Path:
"""Return the case directory for a given case number."""

View File

@@ -0,0 +1,472 @@
"""Unified object-storage layer (X14, INV-STG1).
THE single choke-point for all binary file I/O — originals, derived
artifacts (thumbnails / extracted text), and exports. It replaces the
scattered ``open()`` / ``shutil.copy`` / ``Path.write_bytes`` calls spread
across ~8 services (G2: one storage path, no parallel routes). See
docs/spec/X14-storage-minio.md.
Keys
----
A *key* is a DATA_DIR-relative POSIX path, e.g.::
cases/8174-24/documents/originals/<uuid>.pdf
precedent-library/thumbnails/<case_law_id>/p001.jpg
The filesystem backend maps ``key -> DATA_DIR / key``, preserving the exact
current on-disk layout (zero behaviour change when ``STORAGE_BACKEND`` is the
default ``filesystem``). The S3 backend maps a logical *bucket*
(documents/immutable/derived) to a MinIO bucket and uses the key verbatim as
the object key.
INV-STG2: keys are atomic ASCII/UUID paths; a Hebrew original filename is
carried as object metadata / a DB column, never as the key itself.
INV-STG5: pgvector stays the source of truth for text + embeddings — this
layer stores blobs only. INV-STG6: presigned URLs (minted against the public
endpoint) serve bytes straight to the browser.
Backends (config.STORAGE_BACKEND)
---------------------------------
- ``filesystem`` (default) — disk only; current behaviour.
- ``dual`` — write disk + S3; read S3, fall back to disk.
The migration window; disk stays authoritative.
- ``s3`` — MinIO only.
``aioboto3`` is imported lazily so this module loads even where the dependency
is absent (the default filesystem backend needs nothing extra).
"""
from __future__ import annotations
import logging
import shutil
import tempfile
from enum import Enum
from pathlib import Path, PurePosixPath
from typing import Iterable
from legal_mcp import config
logger = logging.getLogger(__name__)
class Bucket(str, Enum):
"""Logical governance buckets (INV-STG3). Resolved to MinIO bucket names
via config; ignored by the filesystem backend (which keeps one tree)."""
DOCUMENTS = "documents"
IMMUTABLE = "immutable"
DERIVED = "derived"
def _bucket_name(bucket: Bucket) -> str:
return {
Bucket.DOCUMENTS: config.MINIO_BUCKET_DOCUMENTS,
Bucket.IMMUTABLE: config.MINIO_BUCKET_IMMUTABLE,
Bucket.DERIVED: config.MINIO_BUCKET_DERIVED,
}[bucket]
def normalize_key(key: str | Path) -> str:
"""Return a clean DATA_DIR-relative POSIX key.
Rejects absolute paths and ``..`` traversal (defence in depth — keys are
built internally, never from raw user input). An absolute path under
DATA_DIR is accepted and re-relativised so call-sites can pass either a
key or a full ``Path`` during the migration.
"""
p = Path(key)
if p.is_absolute():
try:
p = p.relative_to(config.DATA_DIR)
except ValueError as exc:
raise ValueError(f"absolute path outside DATA_DIR: {key!r}") from exc
posix = PurePosixPath(p.as_posix())
parts = posix.parts
if not parts or any(part == ".." for part in parts):
raise ValueError(f"invalid storage key: {key!r}")
return posix.as_posix().lstrip("/")
class StorageBackend:
"""Abstract backend. All methods are async except the cheap path helpers."""
name = "abstract"
async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
raise NotImplementedError
async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
with open(src, "rb") as fh:
return await self.put_bytes(
key, fh.read(), bucket=bucket,
content_type=content_type, metadata=metadata,
)
async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes:
raise NotImplementedError
async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool:
raise NotImplementedError
async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None:
raise NotImplementedError
async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]:
raise NotImplementedError
async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
download_name=None) -> str:
raise NotImplementedError
async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
content_type=None) -> str:
raise NotImplementedError
def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None:
"""Return a real filesystem path if one exists *without* downloading,
else ``None``. Use :meth:`ensure_local` when a path is required."""
return None
async def ensure_local(self, key, *, bucket=Bucket.DOCUMENTS) -> Path:
"""Return a local path to the object, downloading to a temp file if the
backend has no on-disk copy. Caller owns cleanup of temp files."""
path = self.local_path(key, bucket=bucket)
if path is not None:
return path
data = await self.get_bytes(key, bucket=bucket)
suffix = PurePosixPath(normalize_key(key)).suffix
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(data)
tmp.close()
return Path(tmp.name)
class FilesystemBackend(StorageBackend):
"""Disk under DATA_DIR. ``bucket`` is ignored — the existing single tree is
preserved verbatim, so the default backend is byte-for-byte the legacy
behaviour."""
name = "filesystem"
def _abs(self, key, *, bucket=Bucket.DOCUMENTS) -> Path:
rel = normalize_key(key)
path = (Path(config.DATA_DIR) / rel).resolve()
root = Path(config.DATA_DIR).resolve()
if root not in path.parents and path != root:
raise ValueError(f"resolved path escapes DATA_DIR: {key!r}")
return path
async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
path = self._abs(key, bucket=bucket)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
return f"file://{path}"
async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
path = self._abs(key, bucket=bucket)
path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, path) # preserve mtime, as the legacy code did
return f"file://{path}"
async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes:
return self._abs(key, bucket=bucket).read_bytes()
async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool:
return self._abs(key, bucket=bucket).exists()
async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None:
self._abs(key, bucket=bucket).unlink(missing_ok=True)
async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]:
root = Path(config.DATA_DIR).resolve()
base = self._abs(prefix, bucket=bucket) if prefix else root
if not base.exists():
return []
out: list[str] = []
for p in sorted(base.rglob("*")):
if p.is_file():
out.append(p.resolve().relative_to(root).as_posix())
return out
def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None:
path = self._abs(key, bucket=bucket)
return path if path.exists() else None
async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
download_name=None) -> str:
raise NotImplementedError(
"presigned URLs require the S3 backend (set STORAGE_BACKEND=dual|s3)"
)
async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
content_type=None) -> str:
raise NotImplementedError(
"presigned URLs require the S3 backend (set STORAGE_BACKEND=dual|s3)"
)
class S3Backend(StorageBackend):
"""MinIO via aioboto3. Server-side ops use the internal endpoint; presigned
URLs are minted against the public endpoint so the browser can reach them
(INV-STG6)."""
name = "s3"
def __init__(self) -> None:
self._session = None # lazy aioboto3 session
def _boto(self):
import aioboto3 # lazy — absent in the default filesystem path
from botocore.config import Config as BotoConfig
if self._session is None:
self._session = aioboto3.Session()
cfg = BotoConfig(signature_version="s3v4", s3={"addressing_style": "path"})
return aioboto3, BotoConfig, cfg
def _client(self, *, public: bool = False):
_aioboto3, _BotoConfig, cfg = self._boto()
endpoint = config.MINIO_PUBLIC_ENDPOINT if public else config.MINIO_ENDPOINT
return self._session.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=config.MINIO_ACCESS_KEY,
aws_secret_access_key=config.MINIO_SECRET_KEY,
region_name=config.MINIO_REGION,
config=cfg,
)
async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
k = normalize_key(key)
extra = {}
if content_type:
extra["ContentType"] = content_type
if metadata:
extra["Metadata"] = {kk: str(vv) for kk, vv in metadata.items()}
async with self._client() as s3:
await s3.put_object(Bucket=_bucket_name(bucket), Key=k, Body=data, **extra)
return f"s3://{_bucket_name(bucket)}/{k}"
async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes:
k = normalize_key(key)
async with self._client() as s3:
resp = await s3.get_object(Bucket=_bucket_name(bucket), Key=k)
async with resp["Body"] as stream:
return await stream.read()
async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool:
from botocore.exceptions import ClientError
k = normalize_key(key)
async with self._client() as s3:
try:
await s3.head_object(Bucket=_bucket_name(bucket), Key=k)
return True
except ClientError as exc:
if exc.response["Error"]["Code"] in ("404", "NoSuchKey", "NotFound"):
return False
raise
async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None:
k = normalize_key(key)
async with self._client() as s3:
await s3.delete_object(Bucket=_bucket_name(bucket), Key=k)
async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]:
pfx = normalize_key(prefix) if prefix else ""
out: list[str] = []
async with self._client() as s3:
paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(Bucket=_bucket_name(bucket), Prefix=pfx):
for obj in page.get("Contents", []):
out.append(obj["Key"])
return out
async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
download_name=None) -> str:
k = normalize_key(key)
params = {"Bucket": _bucket_name(bucket), "Key": k}
if download_name:
# RFC 5987 — keep the Hebrew original filename on download (INV-STG2)
from urllib.parse import quote
params["ResponseContentDisposition"] = (
f"attachment; filename*=UTF-8''{quote(download_name)}"
)
async with self._client(public=True) as s3:
return await s3.generate_presigned_url(
"get_object", Params=params,
ExpiresIn=ttl or config.MINIO_PRESIGN_TTL,
)
async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
content_type=None) -> str:
k = normalize_key(key)
params = {"Bucket": _bucket_name(bucket), "Key": k}
if content_type:
params["ContentType"] = content_type
async with self._client(public=True) as s3:
return await s3.generate_presigned_url(
"put_object", Params=params,
ExpiresIn=ttl or config.MINIO_PRESIGN_TTL,
)
class DualBackend(StorageBackend):
"""Migration window: writes go to BOTH disk and S3 (disk authoritative);
reads prefer S3 and fall back to disk. An S3 write failure is logged (never
swallowed — engineering §6) but does not break the app while disk holds the
canonical copy."""
name = "dual"
def __init__(self) -> None:
self.fs = FilesystemBackend()
self.s3 = S3Backend()
async def put_bytes(self, key, data, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
uri = await self.fs.put_bytes(key, data, bucket=bucket,
content_type=content_type, metadata=metadata)
try:
await self.s3.put_bytes(key, data, bucket=bucket,
content_type=content_type, metadata=metadata)
except Exception as exc: # noqa: BLE001 — log, don't swallow
logger.warning("dual put_bytes: S3 mirror failed for %s: %s", key, exc)
return uri
async def put_file(self, src, key, *, bucket=Bucket.DOCUMENTS,
content_type=None, metadata=None) -> str:
uri = await self.fs.put_file(src, key, bucket=bucket,
content_type=content_type, metadata=metadata)
try:
await self.s3.put_file(src, key, bucket=bucket,
content_type=content_type, metadata=metadata)
except Exception as exc: # noqa: BLE001
logger.warning("dual put_file: S3 mirror failed for %s: %s", key, exc)
return uri
async def get_bytes(self, key, *, bucket=Bucket.DOCUMENTS) -> bytes:
try:
return await self.s3.get_bytes(key, bucket=bucket)
except Exception as exc: # noqa: BLE001 — fall back to disk
logger.debug("dual get_bytes: S3 miss for %s (%s); using disk", key, exc)
return await self.fs.get_bytes(key, bucket=bucket)
async def exists(self, key, *, bucket=Bucket.DOCUMENTS) -> bool:
if await self.fs.exists(key, bucket=bucket):
return True
try:
return await self.s3.exists(key, bucket=bucket)
except Exception: # noqa: BLE001
return False
async def delete(self, key, *, bucket=Bucket.DOCUMENTS) -> None:
await self.fs.delete(key, bucket=bucket)
try:
await self.s3.delete(key, bucket=bucket)
except Exception as exc: # noqa: BLE001
logger.warning("dual delete: S3 delete failed for %s: %s", key, exc)
async def list_keys(self, prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]:
return await self.fs.list_keys(prefix, bucket=bucket)
def local_path(self, key, *, bucket=Bucket.DOCUMENTS) -> Path | None:
return self.fs.local_path(key, bucket=bucket)
async def presign_get(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
download_name=None) -> str:
return await self.s3.presign_get(key, bucket=bucket, ttl=ttl,
download_name=download_name)
async def presign_put(self, key, *, bucket=Bucket.DOCUMENTS, ttl=None,
content_type=None) -> str:
return await self.s3.presign_put(key, bucket=bucket, ttl=ttl,
content_type=content_type)
_BACKENDS = {
"filesystem": FilesystemBackend,
"dual": DualBackend,
"s3": S3Backend,
}
_singleton: StorageBackend | None = None
def get_storage() -> StorageBackend:
"""Return the process-wide storage backend selected by config.STORAGE_BACKEND
(cached). Unknown values fall back to ``filesystem`` with a warning rather
than crashing the app."""
global _singleton
if _singleton is None:
cls = _BACKENDS.get(config.STORAGE_BACKEND)
if cls is None:
logger.warning(
"unknown STORAGE_BACKEND=%r — falling back to filesystem",
config.STORAGE_BACKEND,
)
cls = FilesystemBackend
_singleton = cls()
logger.info("storage backend = %s", _singleton.name)
return _singleton
def reset_storage_cache() -> None:
"""Drop the cached backend (tests / after an env change)."""
global _singleton
_singleton = None
# ── module-level convenience wrappers ──────────────────────────────
# Thin pass-throughs so call-sites can ``from legal_mcp.services import storage``
# and use ``await storage.put_bytes(...)`` without fetching the singleton.
async def put_bytes(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None,
metadata=None) -> str:
return await get_storage().put_bytes(
key, data, bucket=bucket, content_type=content_type, metadata=metadata)
async def put_file(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None,
metadata=None) -> str:
return await get_storage().put_file(
src, key, bucket=bucket, content_type=content_type, metadata=metadata)
async def get_bytes(key, *, bucket=Bucket.DOCUMENTS) -> bytes:
return await get_storage().get_bytes(key, bucket=bucket)
async def exists(key, *, bucket=Bucket.DOCUMENTS) -> bool:
return await get_storage().exists(key, bucket=bucket)
async def delete(key, *, bucket=Bucket.DOCUMENTS) -> None:
return await get_storage().delete(key, bucket=bucket)
async def list_keys(prefix, *, bucket=Bucket.DOCUMENTS) -> list[str]:
return await get_storage().list_keys(prefix, bucket=bucket)
async def presign_get(key, *, bucket=Bucket.DOCUMENTS, ttl=None,
download_name=None) -> str:
return await get_storage().presign_get(
key, bucket=bucket, ttl=ttl, download_name=download_name)
async def presign_put(key, *, bucket=Bucket.DOCUMENTS, ttl=None,
content_type=None) -> str:
return await get_storage().presign_put(
key, bucket=bucket, ttl=ttl, content_type=content_type)
def local_path(key, *, bucket=Bucket.DOCUMENTS) -> Path | None:
return get_storage().local_path(key, bucket=bucket)
async def ensure_local(key, *, bucket=Bucket.DOCUMENTS) -> Path:
return await get_storage().ensure_local(key, bucket=bucket)