#!/usr/bin/env python3 """FU-2b — reconcile internal_committee case_number → canonical bare number. Rewrites case_number values that currently hold a full citation into the canonical normalized bare number (X1: trim · prefix-strip · '/'→'-', month preserved). citation_formatted is the display field and is left untouched. DETERMINISTIC — no LLM. Extraction takes the single case-number-shaped token from the value; 0 or >1 tokens are flagged for chair review, never guessed. Usage (must use the mcp-server venv — asyncpg/pgvector vendored there): PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python # Dry-run (default): builds the reconciliation table for chair review. $PY scripts/fu2b_reconcile_internal_case_numbers.py # Apply ONLY the chair-approved rows (after Dafna's review), backup first: $PY scripts/fu2b_reconcile_internal_case_numbers.py --apply \ --approved data/audit/fu2b-approved-.csv Scope: source_kind='internal_committee' only (external → #68/FU-2c). FK-safe: all case_law FKs reference case_law.id (UUID), not case_number. """ from __future__ import annotations import argparse import asyncio import csv import os import re import sys from datetime import datetime, timezone from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src")) if "POSTGRES_URL" not in os.environ: os.environ["POSTGRES_URL"] = ( f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:" f"{os.environ.get('POSTGRES_PASSWORD','')}@" f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:" f"{os.environ.get('POSTGRES_PORT','5433')}/" f"{os.environ.get('POSTGRES_DB','legal_ai')}" ) AUDIT_DIR = REPO_ROOT / "data" / "audit" _TOKEN_RE = re.compile(r"[0-9]{2,6}(?:[-/][0-9]{1,2}){1,2}") def _extract_bare(case_number: str) -> tuple[str | None, str]: """Return (canonical_bare, flag). flag ∈ {OK, NO_NUMBER, MULTI_NUMBER}. Deterministic: finds case-number-shaped tokens (NNNN/YY or NNNN-MM-YY). Exactly one → normalize '/'→'-' (month preserved, none invented). 0 or >1 → None + flag (chair decides; never guess). """ tokens = _TOKEN_RE.findall(case_number or "") if len(tokens) == 1: return tokens[0].replace("/", "-"), "OK" if not tokens: return None, "NO_NUMBER" return None, "MULTI_NUMBER" def _consistency_flag(bare: str | None, citation_formatted: str) -> str: """OK if bare appears in citation_formatted; MISMATCH if not; NO_CITATION if empty.""" if not citation_formatted: return "NO_CITATION" if not bare: return "NO_NUMBER" cf = citation_formatted.replace("/", "-") return "OK" if bare in cf else "MISMATCH" async def _build_reconciliation() -> list[dict]: from legal_mcp.services import db pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, case_number, proceeding_type, coalesce(citation_formatted,'') AS cf " "FROM case_law WHERE source_kind='internal_committee' ORDER BY case_number") out: list[dict] = [] for r in rows: bare, flag = _extract_bare(r["case_number"]) cons = _consistency_flag(bare, r["cf"]) changes = bare is not None and bare != r["case_number"] out.append({ "id": str(r["id"]), "current_case_number": r["case_number"], "proposed_bare": bare or "", "proceeding_type": r["proceeding_type"] or "", "citation_formatted": r["cf"], "extract_flag": flag, "consistency": cons, "will_change": "yes" if changes else "no", }) from collections import Counter bare_counts = Counter(d["proposed_bare"] for d in out if d["proposed_bare"]) for d in out: d["dup_check"] = "DUP_CHECK" if (d["proposed_bare"] and bare_counts[d["proposed_bare"]] > 1) else "" return out def _ts() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]: AUDIT_DIR.mkdir(parents=True, exist_ok=True) csv_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.csv" md_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.md" cols = ["id", "current_case_number", "proposed_bare", "proceeding_type", "citation_formatted", "extract_flag", "consistency", "dup_check", "will_change"] with csv_path.open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() w.writerows(rows) changing = [r for r in rows if r["will_change"] == "yes"] flagged = [r for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH" or r["dup_check"]] with md_path.open("w", encoding="utf-8") as f: f.write(f"# FU-2b — טבלת-תיאום מזהים (internal_committee) — {ts}\n\n") f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n") f.write("## דורש הכרעת-יו\"ר (flags)\n\n") f.write("| current_case_number | proposed_bare | proc | flags |\n|---|---|---|---|\n") for r in flagged: fl = " ".join(x for x in [r["extract_flag"] if r["extract_flag"] != "OK" else "", r["consistency"] if r["consistency"] == "MISMATCH" else "", r["dup_check"]] if x) f.write(f"| {r['current_case_number'][:50]} | {r['proposed_bare']} | {r['proceeding_type']} | {fl} |\n") f.write("\n## כל השינויים המוצעים\n\n") f.write("| current_case_number | → proposed_bare | proc |\n|---|---|---|\n") for r in changing: f.write(f"| {r['current_case_number'][:55]} | {r['proposed_bare']} | {r['proceeding_type']} |\n") return csv_path, md_path async def _apply(approved_csv: Path, ts: str) -> dict: from legal_mcp.services import db with approved_csv.open(encoding="utf-8") as f: approved = [r for r in csv.DictReader(f) if r.get("will_change") == "yes" and r.get("proposed_bare")] if not approved: return {"applied": 0, "note": "no approved changing rows"} AUDIT_DIR.mkdir(parents=True, exist_ok=True) backup = AUDIT_DIR / f"fu2b-backup-{ts}.csv" pool = await db.get_pool() applied = 0 with backup.open("w", newline="", encoding="utf-8") as bf: bw = csv.writer(bf) bw.writerow(["id", "old_case_number"]) async with pool.acquire() as conn: for r in approved: old = await conn.fetchval("SELECT case_number FROM case_law WHERE id=$1", r["id"]) if old is None: continue bw.writerow([r["id"], old]) await conn.execute( "UPDATE case_law SET case_number=$2 WHERE id=$1 " "AND source_kind='internal_committee'", r["id"], r["proposed_bare"]) applied += 1 return {"applied": applied, "backup": str(backup)} async def main() -> int: parser = argparse.ArgumentParser(description="FU-2b internal case_number reconciliation") parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)") parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)") args = parser.parse_args() ts = _ts() if not args.apply: rows = await _build_reconciliation() csv_path, md_path = _write_table(rows, ts) changing = sum(1 for r in rows if r["will_change"] == "yes") flagged = sum(1 for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH" or r["dup_check"]) print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}") print(f" table: {md_path}") print(f" csv: {csv_path}") print("Review the table with the chair, then run --apply --approved .") return 0 if not args.approved: print("ERROR: --apply requires --approved (the chair-reviewed table).", file=sys.stderr) return 2 result = await _apply(Path(args.approved), ts) print(f"APPLIED: {result}") return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))