feat(storage): #106.5 prereq — migrate served-but-untracked files (--untracked)
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
מצב --untracked לסקריפט ההגירה: סורק את ה-filesystem לקטגוריות שה-4 endpoints מגישים אך אינן רשומות בשום עמודת-DB (research/*, proofread/*, drafts/*, exports/*, training/ proofread/*) → מעלה ל-legal-documents עם אותו key יחסי-DATA_DIR. זהו תנאי-הסף שהפאנל התלת-מודלי זיהה: בלי הקבצים האלה ב-MinIO, cutover ל-s3-only היה מחזיר 404 על הגשתם. dry-run אומת: 144 קבצים / 83.9MB, 0 חסרים, 0 outside. הפיך (העתקה אדיטיבית, דיסק שלם). refactor קטן: הלולאה הראשית עובדת על work-list אחיד (DB-tracked או filesystem-scan). invariants: G2 (אותו key/bucket scheme) · INV-STG1/3 · INV-G10 (dry-run/הפיך, אפס שינוי בייצור — רק העלאה לדליות; cutover עדיין נעול-אדם). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -61,6 +61,27 @@ BUCKET_ENV = {
|
|||||||
"immutable": config.MINIO_BUCKET_IMMUTABLE,
|
"immutable": config.MINIO_BUCKET_IMMUTABLE,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Served-but-NOT-DB-tracked file categories (the #106.5 cutover-prerequisite the
|
||||||
|
# tri-model panel flagged): the 4 FileResponse endpoints serve these from case
|
||||||
|
# dirs, but no DB column references them, so the DB-driven pass misses them. All
|
||||||
|
# go to the documents bucket; keys are DATA_DIR-relative (same scheme). Globs are
|
||||||
|
# relative to DATA_DIR.
|
||||||
|
UNTRACKED_GLOBS = [
|
||||||
|
"cases/*/documents/research/*",
|
||||||
|
"cases/*/documents/proofread/*",
|
||||||
|
"cases/*/drafts/*",
|
||||||
|
"cases/*/exports/*",
|
||||||
|
"training/proofread/*",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def iter_untracked():
|
||||||
|
"""Yield (label, host_path) for served files not referenced by any DB column."""
|
||||||
|
for pattern in UNTRACKED_GLOBS:
|
||||||
|
for host in DATA_DIR.glob(pattern):
|
||||||
|
if host.is_file():
|
||||||
|
yield pattern, host
|
||||||
|
|
||||||
|
|
||||||
def resolve_host(stored: str) -> Path | None:
|
def resolve_host(stored: str) -> Path | None:
|
||||||
"""Normalise one stored path (3 legacy formats) to a host filesystem path."""
|
"""Normalise one stored path (3 legacy formats) to a host filesystem path."""
|
||||||
@@ -94,41 +115,50 @@ async def main(args: argparse.Namespace) -> int:
|
|||||||
per_bucket: dict = {}
|
per_bucket: dict = {}
|
||||||
rows_out = []
|
rows_out = []
|
||||||
|
|
||||||
for table, col, bucket in SOURCES:
|
# Build the work-list: DB-tracked columns (default) or the filesystem scan of
|
||||||
try:
|
# served-but-untracked files (--untracked, the #106.5 cutover-prerequisite).
|
||||||
rows = await pool.fetch(
|
items: list[tuple[str, str, str, Path]] = [] # (label, stored, bucket, host)
|
||||||
f"SELECT DISTINCT {col} AS v FROM {table} WHERE COALESCE({col},'') <> ''")
|
if args.untracked:
|
||||||
except Exception as e: # noqa: BLE001
|
for label, host in iter_untracked():
|
||||||
print(f" {table}.{col}: SKIP ({str(e)[:60]})")
|
items.append((label, str(host), "documents", host))
|
||||||
continue
|
else:
|
||||||
|
for table, col, bucket in SOURCES:
|
||||||
|
try:
|
||||||
|
rows = await pool.fetch(
|
||||||
|
f"SELECT DISTINCT {col} AS v FROM {table} WHERE COALESCE({col},'') <> ''")
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
print(f" {table}.{col}: SKIP ({str(e)[:60]})")
|
||||||
|
continue
|
||||||
|
for r in rows:
|
||||||
|
items.append((f"{table}.{col}", r["v"], bucket, resolve_host(r["v"])))
|
||||||
|
|
||||||
|
for label, stored, bucket, host in items:
|
||||||
b = per_bucket.setdefault(bucket, {"found": 0, "missing": 0, "bytes": 0})
|
b = per_bucket.setdefault(bucket, {"found": 0, "missing": 0, "bytes": 0})
|
||||||
for r in rows:
|
key = to_key(host) if host else None
|
||||||
host = resolve_host(r["v"])
|
if host is None or key is None:
|
||||||
key = to_key(host) if host else None
|
totals["outside"] += 1
|
||||||
if host is None or key is None:
|
rows_out.append([label, "", bucket, stored, "", "OUTSIDE_DATA_DIR", 0])
|
||||||
totals["outside"] += 1
|
continue
|
||||||
rows_out.append([table, col, bucket, r["v"], "", "OUTSIDE_DATA_DIR", 0])
|
if not host.exists():
|
||||||
continue
|
totals["missing"] += 1
|
||||||
if not host.exists():
|
b["missing"] += 1
|
||||||
totals["missing"] += 1
|
rows_out.append([label, "", bucket, stored, key, "MISSING", 0])
|
||||||
b["missing"] += 1
|
continue
|
||||||
rows_out.append([table, col, bucket, r["v"], key, "MISSING", 0])
|
size = host.stat().st_size
|
||||||
continue
|
totals["found"] += 1
|
||||||
size = host.stat().st_size
|
totals["bytes"] += size
|
||||||
totals["found"] += 1
|
b["found"] += 1
|
||||||
totals["bytes"] += size
|
b["bytes"] += size
|
||||||
b["found"] += 1
|
status = "PLANNED"
|
||||||
b["bytes"] += size
|
if args.apply:
|
||||||
status = "PLANNED"
|
ok = _upload(args.mc_alias, BUCKET_ENV[bucket], key, host, size)
|
||||||
if args.apply:
|
status = "UPLOADED" if ok else "FAILED"
|
||||||
ok = _upload(args.mc_alias, BUCKET_ENV[bucket], key, host, size)
|
totals["uploaded" if ok else "failed"] += 1
|
||||||
status = "UPLOADED" if ok else "FAILED"
|
rows_out.append([label, "", bucket, stored, key, status, size])
|
||||||
totals["uploaded" if ok else "failed"] += 1
|
|
||||||
rows_out.append([table, col, bucket, r["v"], key, status, size])
|
|
||||||
|
|
||||||
with manifest.open("w", encoding="utf-8", newline="") as f:
|
with manifest.open("w", encoding="utf-8", newline="") as f:
|
||||||
w = csv.writer(f)
|
w = csv.writer(f)
|
||||||
w.writerow(["table", "column", "bucket", "stored_path", "key", "status", "bytes"])
|
w.writerow(["source", "_", "bucket", "stored_path", "key", "status", "bytes"])
|
||||||
w.writerows(rows_out)
|
w.writerows(rows_out)
|
||||||
|
|
||||||
print(f"\n{'APPLY' if args.apply else 'DRY-RUN'} — blob migration plan")
|
print(f"\n{'APPLY' if args.apply else 'DRY-RUN'} — blob migration plan")
|
||||||
@@ -167,4 +197,7 @@ if __name__ == "__main__":
|
|||||||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||||||
ap.add_argument("--apply", action="store_true", help="upload (default: dry-run plan only)")
|
ap.add_argument("--apply", action="store_true", help="upload (default: dry-run plan only)")
|
||||||
ap.add_argument("--mc-alias", default="legalminio", help="mcli alias for MinIO")
|
ap.add_argument("--mc-alias", default="legalminio", help="mcli alias for MinIO")
|
||||||
|
ap.add_argument("--untracked", action="store_true",
|
||||||
|
help="migrate served-but-NOT-DB-tracked files (research/proofread/"
|
||||||
|
"drafts/exports) instead of the DB columns (#106.5 prerequisite)")
|
||||||
raise SystemExit(asyncio.run(main(ap.parse_args())))
|
raise SystemExit(asyncio.run(main(ap.parse_args())))
|
||||||
|
|||||||
Reference in New Issue
Block a user