#!/usr/bin/env python3 """One-time migration — renumber legal-ai cases to canonical ``NNNN-MM-YY``. Adds the missing 2-digit month segment to case numbers (and, for one case, corrects the serial too). Runs on the **host** (not the Coolify container) using the app's DB pool + ``mcli`` (MinIO) + Gitea API + the Paperclip DB. Why a host script and not a UI feature: chaim chose a controlled one-time migration (11 cases) over a permanent edit feature. All relationships in the legal-ai DB are keyed by ``cases.id`` (UUID), so the FK graph is **never** touched. What breaks on a naive ``case_number`` change — and what this script migrates atomically per case — is everything that embeds the number as *text*: 1. cases.case_number UPDATE (guards unique (case_number, proceeding_type)) 2. any text column with '/cases/{old}/' generic path-segment rewrite (documents.file_path, cases.active_draft_path, *_image_thumbnail_path, draft_final_pairs.final_path, … — discovered, not hard-coded) 3. disk dir data/cases/{old} rename + case.json rewrite 4. MinIO keys cases/{old}/… 3 buckets; cp→new then rm old. legal-immutable (WORM/object-lock) → copy-only, old object stays locked. 5. Gitea repo cases/{old} API PATCH name + local .git remote rewrite 6. Paperclip project name replace(old→new) so case↔project lookup holds Bare occurrences of the old number that are NOT inside a '/cases/{old}/' path (e.g. prose in notes, a citation) are *reported for review*, never auto-edited. Usage: # dry-run (default) — inspects every subsystem, writes nothing: DOTENV_PATH=/home/chaim/.env DATA_DIR=/home/chaim/legal-ai/data \ /home/chaim/legal-ai/mcp-server/.venv/bin/python scripts/renumber_cases.py # apply one case, then verify, before doing the rest: … scripts/renumber_cases.py --apply --only 1130-25 # apply all: … scripts/renumber_cases.py --apply Flags: --apply --only --skip-minio --skip-gitea --skip-paperclip """ from __future__ import annotations import argparse import asyncio import json import shutil import subprocess import sys import urllib.error import urllib.request from datetime import datetime, timezone from pathlib import Path from legal_mcp import config from legal_mcp.services import db # ── the migration table ────────────────────────────────────────────────────── # old_cn -> new_cn. 1046-26 is a full serial correction (wrong number), not just # a month add; handled identically (it is just an old→new rename). MAPPING: dict[str, str] = { "1130-25": "1130-08-25", "1194-25": "1194-12-25", "1200-25": "1200-12-25", "8070-25": "8070-05-25", "8137-24": "8137-11-24", "8065-25": "8065-05-25", "8174-24": "8174-12-24", "1027-26": "1027-04-26", "1195-25": "1195-12-25", "1033-25": "1033-02-25", "1046-26": "1024-02-26", # serial correction } # Tier 1 — label-only (no cross-corpus identifier footprint): straightforward. # Tier 2 — archived cases whose final was ingested into the precedent + style # corpora, so the number is a cross-corpus identifier (chaim: propagate it). CLEAN_TIER = ["8137-24", "8065-25", "8174-24", "1027-26", "1195-25", "1033-25", "1046-26"] ARCHIVE_TIER = ["1130-25", "1194-25", "1200-25", "8070-25"] # Structured *identifier* columns — the number is an identity, not prose, so it # tracks the rename (chair-approved: full consistency). EXACT = whole value == old; # SUBSTR = number embedded in a free-form citation string. Everything NOT listed # here (document content, extracted_text, decision prose, notes, titles) is the # historical record and is NEVER auto-edited. IDENTIFIER_EXACT_COLS = [ ("case_law", "case_number"), ("style_corpus", "decision_number"), ("style_exemplars", "decision_number"), ("precedent_internal_citations", "cited_case_number"), ] IDENTIFIER_SUBSTR_COLS = [ ("case_precedents", "citation"), ("digests", "underlying_citation"), ] MINIO_ALIAS = "legalminio" BUCKETS = { "documents": config.MINIO_BUCKET_DOCUMENTS, "immutable": config.MINIO_BUCKET_IMMUTABLE, # WORM — copy-only "derived": config.MINIO_BUCKET_DERIVED, } GITEA_API = "https://gitea.nautilus.marcusgroup.org/api/v1" GITEA_OWNER = "cases" PAPERCLIP_DSN = "postgres://paperclip:paperclip@127.0.0.1:54329/paperclip" AUDIT_DIR = Path(config.DATA_DIR) / "audit" def _ts() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def log(msg: str = "") -> None: print(msg, flush=True) # ── credential helpers (read host-local, never printed) ────────────────────── def _gitea_token() -> str | None: cred = Path.home() / ".git-credentials" if not cred.exists(): return None import re for line in cred.read_text().splitlines(): m = re.match(r"https://([^:]+):([^@]+)@gitea\.nautilus", line) if m: return m.group(2) return None # ── MinIO via mcli ─────────────────────────────────────────────────────────── def _mcli(*args: str) -> tuple[int, str]: p = subprocess.run(["mcli", *args], capture_output=True, text=True) return p.returncode, (p.stdout + p.stderr) def minio_list(bucket: str, cn: str) -> list[str]: rc, out = _mcli("ls", "--recursive", f"{MINIO_ALIAS}/{bucket}/cases/{cn}/") if rc != 0: return [] keys = [] for line in out.splitlines(): parts = line.split() if parts: keys.append(parts[-1]) return keys # ── DB discovery: text columns possibly embedding the case number ──────────── async def text_columns(conn) -> list[tuple[str, str]]: rows = await conn.fetch( """ SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'public' AND data_type IN ('text', 'character varying', 'character') ORDER BY table_name, column_name """ ) return [(r["table_name"], r["column_name"]) for r in rows] async def scan_db_occurrences(conn, old: str) -> dict[str, list[tuple[str, str, int]]]: """Return {'path': [...], 'bare': [...]} where each item is (table, column, count). 'path' = contains 'cases/{old}/' (auto-rewritten); 'bare' = contains the number otherwise (reported for review). The needle has NO leading slash: file_path is stored absolute ('/data/cases/{cn}/…') while image_thumbnail_path is a DATA_DIR-relative storage key ('cases/{cn}/…'). 'cases/{old}/' is a substring of both.""" path_needle = f"cases/{old}/" out: dict[str, list[tuple[str, str, int]]] = {"path": [], "bare": []} for table, col in await text_columns(conn): q = f'SELECT count(*) FROM "{table}" WHERE "{col}" LIKE $1' try: n_path = await conn.fetchval(q, f"%{path_needle}%") n_any = await conn.fetchval(q, f"%{old}%") except Exception: continue if n_path: out["path"].append((table, col, n_path)) n_bare = (n_any or 0) - (n_path or 0) if n_bare > 0: out["bare"].append((table, col, n_bare)) return out # ── per-case inspection ────────────────────────────────────────────────────── async def inspect(conn, old: str, new: str) -> dict: rec: dict = {"old": old, "new": new, "ok": True, "problems": []} crow = await conn.fetchrow("SELECT * FROM cases WHERE case_number = $1", old) if not crow: rec["ok"] = False rec["problems"].append(f"case {old} not found in DB") return rec rec["case_id"] = str(crow["id"]) rec["proceeding_type"] = crow["proceeding_type"] rec["title"] = crow["title"] rec["archived"] = crow["archived_at"] is not None # collision: new must not already exist for the same proceeding_type clash = await conn.fetchval( "SELECT count(*) FROM cases WHERE case_number = $1 AND proceeding_type IS NOT DISTINCT FROM $2", new, crow["proceeding_type"], ) if clash: rec["ok"] = False rec["problems"].append(f"target {new} already exists (proceeding_type collision)") rec["db"] = await scan_db_occurrences(conn, old) # structured identifier columns that will propagate old -> new rec["identifiers"] = [] for table, col in IDENTIFIER_EXACT_COLS: try: n = await conn.fetchval(f'SELECT count(*) FROM "{table}" WHERE "{col}" = $1', old) except Exception: continue if n: clash = await conn.fetchval(f'SELECT count(*) FROM "{table}" WHERE "{col}" = $1', new) rec["identifiers"].append(("exact", table, col, n, clash)) if clash and (table, col) in (("case_law", "case_number"),): rec["problems"].append(f"{table}.{col} target {new} already exists") for table, col in IDENTIFIER_SUBSTR_COLS: try: n = await conn.fetchval(f'SELECT count(*) FROM "{table}" WHERE "{col}" LIKE $1', f"%{old}%") except Exception: continue if n: rec["identifiers"].append(("substr", table, col, n, 0)) # disk host_dir = Path(config.DATA_DIR) / "cases" / old rec["disk_dir"] = str(host_dir) rec["disk_exists"] = host_dir.exists() rec["disk_file_count"] = sum(1 for _ in host_dir.rglob("*") if _.is_file()) if host_dir.exists() else 0 new_dir = Path(config.DATA_DIR) / "cases" / new if new_dir.exists(): rec["ok"] = False rec["problems"].append(f"target disk dir already exists: {new_dir}") # minio rec["minio"] = {} for label, bucket in BUCKETS.items(): keys = minio_list(bucket, old) rec["minio"][label] = {"bucket": bucket, "count": len(keys), "keys": keys} if rec["minio"]["immutable"]["count"]: rec["problems"].append( f"WORM: {rec['minio']['immutable']['count']} sealed object(s) in " f"{BUCKETS['immutable']} — copy-only, old key stays locked" ) # gitea tok = _gitea_token() rec["gitea_repo"] = f"{GITEA_OWNER}/{old}" rec["gitea_exists"] = False if tok: try: req = urllib.request.Request( f"{GITEA_API}/repos/{GITEA_OWNER}/{old}", headers={"Authorization": f"token {tok}"}, ) urllib.request.urlopen(req, timeout=15) rec["gitea_exists"] = True except urllib.error.HTTPError: rec["gitea_exists"] = False # paperclip rec["paperclip"] = await inspect_paperclip(old) return rec async def inspect_paperclip(old: str) -> dict: import asyncpg try: c = await asyncpg.connect(PAPERCLIP_DSN, timeout=10) except Exception as e: return {"reachable": False, "error": str(e)[:120], "projects": []} try: rows = await c.fetch("SELECT id, name FROM projects WHERE name LIKE $1", f"%{old}%") return {"reachable": True, "projects": [(str(r["id"]), r["name"]) for r in rows]} finally: await c.close() # ── apply ──────────────────────────────────────────────────────────────────── async def apply_case(conn, rec: dict, *, skip_minio: bool, skip_gitea: bool, skip_paperclip: bool, propagate: bool) -> None: old, new = rec["old"], rec["new"] case_id = rec["case_id"] AUDIT_DIR.mkdir(parents=True, exist_ok=True) backup = AUDIT_DIR / f"renumber-{old}-to-{new}-{_ts()}.json" backup.write_text(json.dumps(rec, ensure_ascii=False, indent=2, default=str)) log(f" · backup → {backup}") # 1+2. DB: case_number + every path column, in one transaction async with conn.transaction(): await conn.execute("UPDATE cases SET case_number = $1, updated_at = now() WHERE id = $2::uuid", new, case_id) for table, col, _n in rec["db"]["path"]: res = await conn.execute( f'UPDATE "{table}" SET "{col}" = replace("{col}", $1, $2) WHERE "{col}" LIKE $3', f"cases/{old}/", f"cases/{new}/", f"%cases/{old}/%", ) log(f" · DB path {table}.{col}: {res}") # structured identifier propagation (chair-approved full consistency) if propagate: for kind, table, col, _n, _c in rec.get("identifiers", []): if kind == "exact": res = await conn.execute( f'UPDATE "{table}" SET "{col}" = $1 WHERE "{col}" = $2', new, old) else: res = await conn.execute( f'UPDATE "{table}" SET "{col}" = replace("{col}", $1, $2) WHERE "{col}" LIKE $3', old, new, f"%{old}%") log(f" · DB ident {table}.{col}: {res}") log(f" ✓ DB updated ({old} → {new})") # 3. disk dir + case.json host_dir = Path(config.DATA_DIR) / "cases" / old new_dir = Path(config.DATA_DIR) / "cases" / new if host_dir.exists(): host_dir.rename(new_dir) cj = new_dir / "case.json" if cj.exists(): txt = cj.read_text() txt = txt.replace(f"cases/{old}/", f"cases/{new}/").replace(f'"{old}"', f'"{new}"') try: obj = json.loads(txt) if obj.get("case_number") == old: obj["case_number"] = new cj.write_text(json.dumps(obj, ensure_ascii=False, indent=2, default=str)) except json.JSONDecodeError: cj.write_text(txt) log(f" ✓ disk {host_dir.name} → {new_dir.name}") # 4. MinIO — cp then rm (immutable: cp only) if not skip_minio: for label, bucket in BUCKETS.items(): if not rec["minio"][label]["count"]: continue src = f"{MINIO_ALIAS}/{bucket}/cases/{old}/" dst = f"{MINIO_ALIAS}/{bucket}/cases/{new}/" rc, out = _mcli("cp", "--recursive", src, dst) if rc != 0: log(f" ✗ MinIO cp {bucket} failed: {out.strip()[:200]}") continue if label == "immutable": log(f" ✓ MinIO {bucket}: copied {rec['minio'][label]['count']} (WORM — old kept locked)") else: rc2, out2 = _mcli("rm", "--recursive", "--force", src) log(f" ✓ MinIO {bucket}: copied+removed {rec['minio'][label]['count']}" + ("" if rc2 == 0 else f" (rm warn: {out2.strip()[:120]})")) # 5. Gitea repo rename + local remote rewrite if not skip_gitea and rec.get("gitea_exists"): tok = _gitea_token() new_desc = f"ערר {new} — {rec.get('title', '')}" body = json.dumps({"name": new, "description": new_desc}).encode() req = urllib.request.Request( f"{GITEA_API}/repos/{GITEA_OWNER}/{old}", data=body, method="PATCH", headers={"Authorization": f"token {tok}", "Content-Type": "application/json"}, ) try: urllib.request.urlopen(req, timeout=20) log(f" ✓ Gitea repo {GITEA_OWNER}/{old} → {new}") gitcfg = new_dir / ".git" / "config" if gitcfg.exists(): gitcfg.write_text(gitcfg.read_text().replace(f"/{old}.git", f"/{new}.git") .replace(f"/{old}\n", f"/{new}\n")) except urllib.error.HTTPError as e: log(f" ✗ Gitea rename failed: HTTP {e.code} {e.read()[:160]!r}") # 6. Paperclip project name if not skip_paperclip and rec["paperclip"].get("reachable") and rec["paperclip"]["projects"]: import asyncpg c = await asyncpg.connect(PAPERCLIP_DSN, timeout=10) try: res = await c.execute( "UPDATE projects SET name = replace(name, $1, $2), updated_at = now() WHERE name LIKE $3", old, new, f"%{old}%", ) log(f" ✓ Paperclip projects: {res}") finally: await c.close() async def verify_case(conn, old: str, new: str) -> bool: row = await conn.fetchrow("SELECT id FROM cases WHERE case_number = $1", new) gone = await conn.fetchval("SELECT count(*) FROM cases WHERE case_number = $1", old) leftover = await conn.fetchval( "SELECT count(*) FROM documents WHERE file_path LIKE $1", f"%cases/{old}/%" ) ok = bool(row) and gone == 0 and leftover == 0 log(f" verify {new}: row={'✓' if row else '✗'} old_gone={'✓' if gone == 0 else '✗'} " f"stale_doc_paths={leftover} → {'OK' if ok else 'CHECK'}") return ok # ── reporting ──────────────────────────────────────────────────────────────── def print_inspection(rec: dict) -> None: flag = "OK " if rec["ok"] else "!! " log(f"\n{flag}{rec['old']} → {rec['new']} " f"[{rec.get('proceeding_type','?')}] {'ARCH' if rec.get('archived') else 'active'} " f"— {rec.get('title','')[:40]}") if not rec["ok"]: for p in rec["problems"]: log(f" ✗ {p}") if "case_id" not in rec: return log(f" disk: {rec['disk_file_count']} files {rec['disk_dir']}" + ("" if rec["disk_exists"] else " (MISSING)")) for label in ("documents", "derived", "immutable"): m = rec["minio"][label] tag = " ⛔WORM" if label == "immutable" and m["count"] else "" log(f" minio: {m['count']:>4} {m['bucket']}{tag}") log(f" gitea: {'exists' if rec['gitea_exists'] else 'absent '} {rec['gitea_repo']}") pc = rec["paperclip"] if pc.get("reachable"): for pid, name in pc["projects"]: log(f" pclip: {name}") if not pc["projects"]: log(" pclip: (no matching project)") else: log(f" pclip: unreachable ({pc.get('error','')})") log(" DB path columns to rewrite:") for t, c, n in rec["db"]["path"]: log(f" {t}.{c} ({n})") if rec.get("identifiers"): log(" identifier columns to PROPAGATE (old→new):") for kind, t, c, n, clash in rec["identifiers"]: warn = f" ⚠ target exists ({clash})" if clash else "" log(f" {t}.{c} ({n}, {kind}){warn}") if rec["db"]["bare"]: log(" ⚠ bare-number occurrences (REVIEW — not auto-edited):") for t, c, n in rec["db"]["bare"]: log(f" {t}.{c} ({n})") for p in rec["problems"]: log(f" ⚠ {p}") async def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--apply", action="store_true", help="execute (default: dry-run)") ap.add_argument("--only", help="restrict to a single old case number") ap.add_argument("--tier", choices=["clean", "archive", "all"], default="all", help="clean=7 label-only, archive=4 with corpus footprint") ap.add_argument("--no-propagate", action="store_true", help="do NOT propagate the number to identifier corpus columns") ap.add_argument("--skip-minio", action="store_true") ap.add_argument("--skip-gitea", action="store_true") ap.add_argument("--skip-paperclip", action="store_true") args = ap.parse_args() tier = {"clean": CLEAN_TIER, "archive": ARCHIVE_TIER, "all": list(MAPPING)}[args.tier] items = [(o, n) for o, n in MAPPING.items() if o in tier] if args.only: items = [(o, n) for o, n in MAPPING.items() if o == args.only] if not items: log(f"--only {args.only}: not in mapping") return 2 pool = await db.get_pool() async with pool.acquire() as conn: recs = [await inspect(conn, old, new) for old, new in items] for rec in recs: print_inspection(rec) blocked = [r for r in recs if not r["ok"]] log(f"\n{'='*60}") log(f"{len(recs)} case(s) · {len(blocked)} blocked · " f"mode = {'APPLY' if args.apply else 'DRY-RUN'}") if blocked: log("blocked: " + ", ".join(f"{r['old']}({'; '.join(r['problems'])})" for r in blocked)) if not args.apply: log("\nDry-run only. Re-run with --apply (optionally --only ) to execute.") return 0 runnable = [r for r in recs if r["ok"]] if not runnable: log("nothing to apply.") return 1 for rec in runnable: log(f"\n── applying {rec['old']} → {rec['new']} ──") await apply_case(conn, rec, skip_minio=args.skip_minio, skip_gitea=args.skip_gitea, skip_paperclip=args.skip_paperclip, propagate=not args.no_propagate) await verify_case(conn, rec["old"], rec["new"]) log("\ndone.") return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))