feat(storage): #106.4 — DB-driven blob→MinIO migration script (dry-run default)
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s

הגירת בלובים מדיסק ל-MinIO, מונחית-DB ולא `mc mirror` גורף — כי ה-bucket נקבע
per-file-SEMANTIC (מסמך/טיוטה→documents, thumbnail→derived). סורק 6 עמודות-נתיב
שקיימות בפועל (documents.file_path · cases.active_draft_path · digests.source_document_path
· draft_final_pairs.final_path · document_image_embeddings/precedent_image_embeddings.
image_thumbnail_path) — לא כפי שהספ הניח (case_law.source_document_path/*_image_pages לא קיימים).

מטפל ב-3 פורמטי-נתיב legacy לא-עקביים (אומת 2026-06-11): container-abs `/data/…`,
host-abs `/home/chaim/legal-ai/data/…`, ו-relative — מנרמל ל-key יחסי-DATA_DIR (תואם
storage.normalize_key + אתרי-הכתיבה #106.3 + read-wiring העתידי #106.5). קבצים שלא
נמצאים/מחוץ-ל-DATA_DIR מדווחים, לא נבלעים.

dry-run (ברירת-מחדל): תוכנית + מניפסט CSV ל-data/audit, אפס-שינוי. --apply מעלה דרך mcli
ומאמת size אחרי כל PUT; **הדיסק לא נוגע** → re-run אידמפוטנטי וההגירה הפיכה (לרוקן דליות
+ flip חזרה ל-filesystem). נרמול עמודות-ה-DB ל-keys נקיים = צעד נפרד מאוחר (#106.5).

אומת חי (dry-run): derived 2593 (260MB) · documents 811 (638MB) · 0 outside · 28 חסרים
(רפרנסי-DB תלויים מראש). סה"כ 3404 קבצים / 899MB.

invariants: G2 (key=normalize_key, מסלול-אחסון יחיד) · INV-STG1/3 (storage layer, bucket
per-governance) · INV-G10 (dry-run/הפיך, לא נוגע בדיסק). הצעדים הבלתי-הפיכים (cutover/WORM)
נפרדים ועוצרים לאישור.
tests: dry-run חי = אימות (count+size+normalization). py_compile OK.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-11 17:19:05 +00:00
parent 671edf1128
commit 970e8dc748
2 changed files with 171 additions and 0 deletions

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""#106.4 — migrate binary blobs (PDF/DOCX/thumbnails) from disk to MinIO.
DB-DRIVEN, NOT a wholesale ``mc mirror``: the bucket is chosen per-file-semantic
(source/draft → documents, thumbnail → derived), so the migration walks the DB
path columns and uploads each referenced file to its correct (bucket, key). The
key is the DATA_DIR-relative POSIX path (storage.normalize_key), matching how the
write-wiring (#106.3) and read-wiring (#106.5) resolve keys.
⚠️ The legacy path columns are INCONSISTENT (audited 2026-06-11): three formats
coexist — container-absolute ``/data/…``, host-absolute
``/home/chaim/legal-ai/data/…``, and DATA_DIR-relative ``digests/…``. This script
normalises all three to a host path + a clean key. Files it cannot locate are
reported, never silently skipped.
Buckets (X14 §3.1):
documents → originals, drafts/exports, digests sources, finals (finals promote
to immutable only at #106.7).
derived → page thumbnails.
DRY-RUN by default: prints the full plan (per table/bucket: found / missing /
bytes) and writes a CSV manifest to data/audit/. Touches NOTHING. ``--apply``
uploads via the configured MinIO client (mcli alias from --mc-alias), verifying
size after each PUT; the disk is never modified, so a re-run is idempotent and
the migration is reversible (wipe the buckets, flip STORAGE_BACKEND back).
DB path-column normalisation to clean keys is a SEPARATE, later step (reads still
use the legacy paths until #106.5 deploys) — this script only moves bytes.
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/migrate_blobs_to_minio.py # dry-run plan
.venv/bin/python ../scripts/migrate_blobs_to_minio.py --apply --mc-alias legalminio
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from legal_mcp import config
from legal_mcp.services import db
DATA_DIR = Path(config.DATA_DIR).resolve()
_CONTAINER_DATA = "/data/" # the container bind-mount target seen in legacy rows
# (table, column, bucket) — only columns that actually exist (audited 2026-06-11).
SOURCES = [
("documents", "file_path", "documents"),
("cases", "active_draft_path", "documents"),
("digests", "source_document_path", "documents"),
("draft_final_pairs", "final_path", "documents"),
("document_image_embeddings", "image_thumbnail_path", "derived"),
("precedent_image_embeddings", "image_thumbnail_path", "derived"),
]
BUCKET_ENV = {
"documents": config.MINIO_BUCKET_DOCUMENTS,
"derived": config.MINIO_BUCKET_DERIVED,
"immutable": config.MINIO_BUCKET_IMMUTABLE,
}
def resolve_host(stored: str) -> Path | None:
"""Normalise one stored path (3 legacy formats) to a host filesystem path."""
s = (stored or "").strip()
if not s:
return None
if s.startswith(_CONTAINER_DATA): # container-absolute /data/…
return DATA_DIR / s[len(_CONTAINER_DATA):]
p = Path(s)
if p.is_absolute(): # host-absolute
return p
return DATA_DIR / s # DATA_DIR-relative
def to_key(host: Path) -> str | None:
"""DATA_DIR-relative POSIX key, or None if the file is outside DATA_DIR."""
try:
return host.resolve().relative_to(DATA_DIR).as_posix()
except ValueError:
return None
async def main(args: argparse.Namespace) -> int:
pool = await db.get_pool()
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
audit = Path(config.DATA_DIR) / "audit"
audit.mkdir(parents=True, exist_ok=True)
manifest = audit / f"minio-migration-plan-{ts}.csv"
totals = {"found": 0, "missing": 0, "outside": 0, "bytes": 0, "uploaded": 0, "failed": 0}
per_bucket: dict = {}
rows_out = []
for table, col, bucket in SOURCES:
try:
rows = await pool.fetch(
f"SELECT DISTINCT {col} AS v FROM {table} WHERE COALESCE({col},'') <> ''")
except Exception as e: # noqa: BLE001
print(f" {table}.{col}: SKIP ({str(e)[:60]})")
continue
b = per_bucket.setdefault(bucket, {"found": 0, "missing": 0, "bytes": 0})
for r in rows:
host = resolve_host(r["v"])
key = to_key(host) if host else None
if host is None or key is None:
totals["outside"] += 1
rows_out.append([table, col, bucket, r["v"], "", "OUTSIDE_DATA_DIR", 0])
continue
if not host.exists():
totals["missing"] += 1
b["missing"] += 1
rows_out.append([table, col, bucket, r["v"], key, "MISSING", 0])
continue
size = host.stat().st_size
totals["found"] += 1
totals["bytes"] += size
b["found"] += 1
b["bytes"] += size
status = "PLANNED"
if args.apply:
ok = _upload(args.mc_alias, BUCKET_ENV[bucket], key, host, size)
status = "UPLOADED" if ok else "FAILED"
totals["uploaded" if ok else "failed"] += 1
rows_out.append([table, col, bucket, r["v"], key, status, size])
with manifest.open("w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["table", "column", "bucket", "stored_path", "key", "status", "bytes"])
w.writerows(rows_out)
print(f"\n{'APPLY' if args.apply else 'DRY-RUN'} — blob migration plan")
print("=" * 56)
for bucket, b in sorted(per_bucket.items()):
print(f" {bucket:10} found={b['found']:5} missing={b['missing']:4} "
f"bytes={b['bytes']/1e6:.1f}MB")
print("-" * 56)
print(f" TOTAL found={totals['found']} missing={totals['missing']} "
f"outside-DATA_DIR={totals['outside']} bytes={totals['bytes']/1e6:.1f}MB")
if args.apply:
print(f" uploaded={totals['uploaded']} failed={totals['failed']}")
print(f"\nmanifest → {manifest}")
if totals["missing"] or totals["outside"]:
print("⚠ some referenced files are missing/outside DATA_DIR — review the "
"manifest BEFORE --apply; they will not migrate.")
return 0
def _upload(alias: str, bucket: str, key: str, host: Path, size: int) -> bool:
"""Upload one file via mcli and verify the remote size. Disk untouched."""
target = f"{alias}/{bucket}/{key}"
try:
subprocess.run(["mcli", "cp", "-q", str(host), target],
check=True, capture_output=True, timeout=300)
out = subprocess.run(["mcli", "stat", "--json", target],
capture_output=True, text=True, timeout=60)
return out.returncode == 0 and str(size) in out.stdout
except Exception as e: # noqa: BLE001
print(f" upload FAILED {key}: {str(e)[:80]}")
return False
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--apply", action="store_true", help="upload (default: dry-run plan only)")
ap.add_argument("--mc-alias", default="legalminio", help="mcli alias for MinIO")
raise SystemExit(asyncio.run(main(ap.parse_args())))