#!/usr/bin/env python3 """INV-STG1 runtime tripwire — detect blobs that leaked to the old disk folders without reaching MinIO (the detective control complementing the CI leak-guard). After the s3-only cutover, every blob written under DATA_DIR/{cases, precedent-library,internal-decisions,digests,training} should ALSO be in MinIO (the upload/finalize paths keep a disk copy for the pipeline but mirror to S3 via storage.mirror — see web/app.py _seal_blob). A file present on disk but ABSENT from the matching S3 bucket means a write bypassed the seal → it would be lost on disk cleanup and is not served/backed-up. This script reports them. Classifies disk files into documents/derived buckets exactly like the migration (``*/extracted/*`` and ``*/thumbnails/*`` → legal-derived; the rest → legal- documents) and compares against the live bucket key-sets (proper JSON key match, so Hebrew filenames with spaces compare correctly). Read-only. Run locally (needs the `legalminio` mcli alias): python3 scripts/storage_leak_tripwire.py # full scan python3 scripts/storage_leak_tripwire.py --since 2026-06-11 # only newer files """ from __future__ import annotations import argparse import json import subprocess import sys from pathlib import Path MCLI = str(Path.home() / ".local" / "bin" / "mcli") DATA = Path("/home/chaim/legal-ai/data") CATS = ["cases", "precedent-library", "internal-decisions", "digests", "training"] # non-blob disk files that legitimately stay on disk / in git-per-case SKIP_SUFFIX = {".tmp", ".log"} SKIP_NAME = {"case.json", "notes.md", ".pull.log"} def _bucket_for(rel: str) -> str: return ("legal-derived" if ("/extracted/" in rel or "/thumbnails/" in rel) else "legal-documents") def _s3_keys(bucket: str) -> set[str]: out = subprocess.run([MCLI, "ls", "--recursive", "--json", f"legalminio/{bucket}"], capture_output=True, text=True, env={"TERM": "xterm", "HOME": str(Path.home())}) keys: set[str] = set() for ln in out.stdout.splitlines(): try: k = json.loads(ln).get("key", "") except json.JSONDecodeError: continue if k and "/.git/" not in k: keys.add(k) return keys def main(args) -> int: s3 = {b: _s3_keys(b) for b in ("legal-documents", "legal-derived")} since = None if args.since: import datetime since = datetime.datetime.fromisoformat(args.since).timestamp() leaked: list[str] = [] scanned = 0 for cat in CATS: root = DATA / cat if not root.exists(): continue for f in root.rglob("*"): if not f.is_file() or "/.git/" in f.as_posix(): continue if f.suffix in SKIP_SUFFIX or f.name in SKIP_NAME: continue if since and f.stat().st_mtime < since: continue rel = f.relative_to(DATA).as_posix() scanned += 1 if rel not in s3[_bucket_for(rel)]: leaked.append(rel) print(f"scanned {scanned} disk blobs across {CATS}") if not leaked: print("✓ no leaks — every disk blob is present in MinIO.") return 0 print(f"⚠ {len(leaked)} LEAKED blobs (on disk, NOT in MinIO):") for r in leaked[:50]: print(f" {r} → expected in {_bucket_for(r)}") if len(leaked) > 50: print(f" … and {len(leaked) - 50} more") return 1 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--since", help="ISO date — only check files modified on/after") sys.exit(main(ap.parse_args()))