All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
מצב --untracked לסקריפט ההגירה: סורק את ה-filesystem לקטגוריות שה-4 endpoints מגישים אך אינן רשומות בשום עמודת-DB (research/*, proofread/*, drafts/*, exports/*, training/ proofread/*) → מעלה ל-legal-documents עם אותו key יחסי-DATA_DIR. זהו תנאי-הסף שהפאנל התלת-מודלי זיהה: בלי הקבצים האלה ב-MinIO, cutover ל-s3-only היה מחזיר 404 על הגשתם. dry-run אומת: 144 קבצים / 83.9MB, 0 חסרים, 0 outside. הפיך (העתקה אדיטיבית, דיסק שלם). refactor קטן: הלולאה הראשית עובדת על work-list אחיד (DB-tracked או filesystem-scan). invariants: G2 (אותו key/bucket scheme) · INV-STG1/3 · INV-G10 (dry-run/הפיך, אפס שינוי בייצור — רק העלאה לדליות; cutover עדיין נעול-אדם). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
204 lines
8.6 KiB
Python
204 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""#106.4 — migrate binary blobs (PDF/DOCX/thumbnails) from disk to MinIO.
|
|
|
|
DB-DRIVEN, NOT a wholesale ``mc mirror``: the bucket is chosen per-file-semantic
|
|
(source/draft → documents, thumbnail → derived), so the migration walks the DB
|
|
path columns and uploads each referenced file to its correct (bucket, key). The
|
|
key is the DATA_DIR-relative POSIX path (storage.normalize_key), matching how the
|
|
write-wiring (#106.3) and read-wiring (#106.5) resolve keys.
|
|
|
|
⚠️ The legacy path columns are INCONSISTENT (audited 2026-06-11): three formats
|
|
coexist — container-absolute ``/data/…``, host-absolute
|
|
``/home/chaim/legal-ai/data/…``, and DATA_DIR-relative ``digests/…``. This script
|
|
normalises all three to a host path + a clean key. Files it cannot locate are
|
|
reported, never silently skipped.
|
|
|
|
Buckets (X14 §3.1):
|
|
documents → originals, drafts/exports, digests sources, finals (finals promote
|
|
to immutable only at #106.7).
|
|
derived → page thumbnails.
|
|
|
|
DRY-RUN by default: prints the full plan (per table/bucket: found / missing /
|
|
bytes) and writes a CSV manifest to data/audit/. Touches NOTHING. ``--apply``
|
|
uploads via the configured MinIO client (mcli alias from --mc-alias), verifying
|
|
size after each PUT; the disk is never modified, so a re-run is idempotent and
|
|
the migration is reversible (wipe the buckets, flip STORAGE_BACKEND back).
|
|
|
|
DB path-column normalisation to clean keys is a SEPARATE, later step (reads still
|
|
use the legacy paths until #106.5 deploys) — this script only moves bytes.
|
|
|
|
cd ~/legal-ai/mcp-server
|
|
.venv/bin/python ../scripts/migrate_blobs_to_minio.py # dry-run plan
|
|
.venv/bin/python ../scripts/migrate_blobs_to_minio.py --apply --mc-alias legalminio
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import csv
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import db
|
|
|
|
DATA_DIR = Path(config.DATA_DIR).resolve()
|
|
_CONTAINER_DATA = "/data/" # the container bind-mount target seen in legacy rows
|
|
|
|
# (table, column, bucket) — only columns that actually exist (audited 2026-06-11).
|
|
SOURCES = [
|
|
("documents", "file_path", "documents"),
|
|
("cases", "active_draft_path", "documents"),
|
|
("digests", "source_document_path", "documents"),
|
|
("draft_final_pairs", "final_path", "documents"),
|
|
("document_image_embeddings", "image_thumbnail_path", "derived"),
|
|
("precedent_image_embeddings", "image_thumbnail_path", "derived"),
|
|
]
|
|
BUCKET_ENV = {
|
|
"documents": config.MINIO_BUCKET_DOCUMENTS,
|
|
"derived": config.MINIO_BUCKET_DERIVED,
|
|
"immutable": config.MINIO_BUCKET_IMMUTABLE,
|
|
}
|
|
|
|
# Served-but-NOT-DB-tracked file categories (the #106.5 cutover-prerequisite the
|
|
# tri-model panel flagged): the 4 FileResponse endpoints serve these from case
|
|
# dirs, but no DB column references them, so the DB-driven pass misses them. All
|
|
# go to the documents bucket; keys are DATA_DIR-relative (same scheme). Globs are
|
|
# relative to DATA_DIR.
|
|
UNTRACKED_GLOBS = [
|
|
"cases/*/documents/research/*",
|
|
"cases/*/documents/proofread/*",
|
|
"cases/*/drafts/*",
|
|
"cases/*/exports/*",
|
|
"training/proofread/*",
|
|
]
|
|
|
|
|
|
def iter_untracked():
|
|
"""Yield (label, host_path) for served files not referenced by any DB column."""
|
|
for pattern in UNTRACKED_GLOBS:
|
|
for host in DATA_DIR.glob(pattern):
|
|
if host.is_file():
|
|
yield pattern, host
|
|
|
|
|
|
def resolve_host(stored: str) -> Path | None:
|
|
"""Normalise one stored path (3 legacy formats) to a host filesystem path."""
|
|
s = (stored or "").strip()
|
|
if not s:
|
|
return None
|
|
if s.startswith(_CONTAINER_DATA): # container-absolute /data/…
|
|
return DATA_DIR / s[len(_CONTAINER_DATA):]
|
|
p = Path(s)
|
|
if p.is_absolute(): # host-absolute
|
|
return p
|
|
return DATA_DIR / s # DATA_DIR-relative
|
|
|
|
|
|
def to_key(host: Path) -> str | None:
|
|
"""DATA_DIR-relative POSIX key, or None if the file is outside DATA_DIR."""
|
|
try:
|
|
return host.resolve().relative_to(DATA_DIR).as_posix()
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
async def main(args: argparse.Namespace) -> int:
|
|
pool = await db.get_pool()
|
|
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
audit = Path(config.DATA_DIR) / "audit"
|
|
audit.mkdir(parents=True, exist_ok=True)
|
|
manifest = audit / f"minio-migration-plan-{ts}.csv"
|
|
|
|
totals = {"found": 0, "missing": 0, "outside": 0, "bytes": 0, "uploaded": 0, "failed": 0}
|
|
per_bucket: dict = {}
|
|
rows_out = []
|
|
|
|
# Build the work-list: DB-tracked columns (default) or the filesystem scan of
|
|
# served-but-untracked files (--untracked, the #106.5 cutover-prerequisite).
|
|
items: list[tuple[str, str, str, Path]] = [] # (label, stored, bucket, host)
|
|
if args.untracked:
|
|
for label, host in iter_untracked():
|
|
items.append((label, str(host), "documents", host))
|
|
else:
|
|
for table, col, bucket in SOURCES:
|
|
try:
|
|
rows = await pool.fetch(
|
|
f"SELECT DISTINCT {col} AS v FROM {table} WHERE COALESCE({col},'') <> ''")
|
|
except Exception as e: # noqa: BLE001
|
|
print(f" {table}.{col}: SKIP ({str(e)[:60]})")
|
|
continue
|
|
for r in rows:
|
|
items.append((f"{table}.{col}", r["v"], bucket, resolve_host(r["v"])))
|
|
|
|
for label, stored, bucket, host in items:
|
|
b = per_bucket.setdefault(bucket, {"found": 0, "missing": 0, "bytes": 0})
|
|
key = to_key(host) if host else None
|
|
if host is None or key is None:
|
|
totals["outside"] += 1
|
|
rows_out.append([label, "", bucket, stored, "", "OUTSIDE_DATA_DIR", 0])
|
|
continue
|
|
if not host.exists():
|
|
totals["missing"] += 1
|
|
b["missing"] += 1
|
|
rows_out.append([label, "", bucket, stored, key, "MISSING", 0])
|
|
continue
|
|
size = host.stat().st_size
|
|
totals["found"] += 1
|
|
totals["bytes"] += size
|
|
b["found"] += 1
|
|
b["bytes"] += size
|
|
status = "PLANNED"
|
|
if args.apply:
|
|
ok = _upload(args.mc_alias, BUCKET_ENV[bucket], key, host, size)
|
|
status = "UPLOADED" if ok else "FAILED"
|
|
totals["uploaded" if ok else "failed"] += 1
|
|
rows_out.append([label, "", bucket, stored, key, status, size])
|
|
|
|
with manifest.open("w", encoding="utf-8", newline="") as f:
|
|
w = csv.writer(f)
|
|
w.writerow(["source", "_", "bucket", "stored_path", "key", "status", "bytes"])
|
|
w.writerows(rows_out)
|
|
|
|
print(f"\n{'APPLY' if args.apply else 'DRY-RUN'} — blob migration plan")
|
|
print("=" * 56)
|
|
for bucket, b in sorted(per_bucket.items()):
|
|
print(f" {bucket:10} found={b['found']:5} missing={b['missing']:4} "
|
|
f"bytes={b['bytes']/1e6:.1f}MB")
|
|
print("-" * 56)
|
|
print(f" TOTAL found={totals['found']} missing={totals['missing']} "
|
|
f"outside-DATA_DIR={totals['outside']} bytes={totals['bytes']/1e6:.1f}MB")
|
|
if args.apply:
|
|
print(f" uploaded={totals['uploaded']} failed={totals['failed']}")
|
|
print(f"\nmanifest → {manifest}")
|
|
if totals["missing"] or totals["outside"]:
|
|
print("⚠ some referenced files are missing/outside DATA_DIR — review the "
|
|
"manifest BEFORE --apply; they will not migrate.")
|
|
return 0
|
|
|
|
|
|
def _upload(alias: str, bucket: str, key: str, host: Path, size: int) -> bool:
|
|
"""Upload one file via mcli and verify the remote size. Disk untouched."""
|
|
target = f"{alias}/{bucket}/{key}"
|
|
try:
|
|
subprocess.run(["mcli", "cp", "-q", str(host), target],
|
|
check=True, capture_output=True, timeout=300)
|
|
out = subprocess.run(["mcli", "stat", "--json", target],
|
|
capture_output=True, text=True, timeout=60)
|
|
return out.returncode == 0 and str(size) in out.stdout
|
|
except Exception as e: # noqa: BLE001
|
|
print(f" upload FAILED {key}: {str(e)[:80]}")
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ap = argparse.ArgumentParser(description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
ap.add_argument("--apply", action="store_true", help="upload (default: dry-run plan only)")
|
|
ap.add_argument("--mc-alias", default="legalminio", help="mcli alias for MinIO")
|
|
ap.add_argument("--untracked", action="store_true",
|
|
help="migrate served-but-NOT-DB-tracked files (research/proofread/"
|
|
"drafts/exports) instead of the DB columns (#106.5 prerequisite)")
|
|
raise SystemExit(asyncio.run(main(ap.parse_args())))
|