diff --git a/scripts/fu2b_reconcile_internal_case_numbers.py b/scripts/fu2b_reconcile_internal_case_numbers.py new file mode 100755 index 0000000..8ab4238 --- /dev/null +++ b/scripts/fu2b_reconcile_internal_case_numbers.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +"""FU-2b — reconcile internal_committee case_number → canonical bare number. + +Rewrites case_number values that currently hold a full citation into the +canonical normalized bare number (X1: trim · prefix-strip · '/'→'-', month +preserved). citation_formatted is the display field and is left untouched. + +DETERMINISTIC — no LLM. Extraction takes the single case-number-shaped token +from the value; 0 or >1 tokens are flagged for chair review, never guessed. + +Usage (must use the mcp-server venv — asyncpg/pgvector vendored there): + PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python + + # Dry-run (default): builds the reconciliation table for chair review. + $PY scripts/fu2b_reconcile_internal_case_numbers.py + + # Apply ONLY the chair-approved rows (after Dafna's review), backup first: + $PY scripts/fu2b_reconcile_internal_case_numbers.py --apply \ + --approved data/audit/fu2b-approved-.csv + +Scope: source_kind='internal_committee' only (external → #68/FU-2c). FK-safe: +all case_law FKs reference case_law.id (UUID), not case_number. +""" +from __future__ import annotations + +import argparse +import asyncio +import csv +import os +import re +import sys +from datetime import datetime, timezone +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src")) + +if "POSTGRES_URL" not in os.environ: + os.environ["POSTGRES_URL"] = ( + f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:" + f"{os.environ.get('POSTGRES_PASSWORD','')}@" + f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:" + f"{os.environ.get('POSTGRES_PORT','5433')}/" + f"{os.environ.get('POSTGRES_DB','legal_ai')}" + ) + +AUDIT_DIR = REPO_ROOT / "data" / "audit" +_TOKEN_RE = re.compile(r"[0-9]{2,6}(?:[-/][0-9]{1,2}){1,2}") + + +def _extract_bare(case_number: str) -> tuple[str | None, str]: + """Return (canonical_bare, flag). flag ∈ {OK, NO_NUMBER, MULTI_NUMBER}. + + Deterministic: finds case-number-shaped tokens (NNNN/YY or NNNN-MM-YY). + Exactly one → normalize '/'→'-' (month preserved, none invented). 0 or >1 + → None + flag (chair decides; never guess). + """ + tokens = _TOKEN_RE.findall(case_number or "") + if len(tokens) == 1: + return tokens[0].replace("/", "-"), "OK" + if not tokens: + return None, "NO_NUMBER" + return None, "MULTI_NUMBER" + + +def _consistency_flag(bare: str | None, citation_formatted: str) -> str: + """OK if bare appears in citation_formatted; MISMATCH if not; NO_CITATION if empty.""" + if not citation_formatted: + return "NO_CITATION" + if not bare: + return "NO_NUMBER" + cf = citation_formatted.replace("/", "-") + return "OK" if bare in cf else "MISMATCH" + + +async def _build_reconciliation() -> list[dict]: + from legal_mcp.services import db + pool = await db.get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT id, case_number, proceeding_type, coalesce(citation_formatted,'') AS cf " + "FROM case_law WHERE source_kind='internal_committee' ORDER BY case_number") + out: list[dict] = [] + for r in rows: + bare, flag = _extract_bare(r["case_number"]) + cons = _consistency_flag(bare, r["cf"]) + changes = bare is not None and bare != r["case_number"] + out.append({ + "id": str(r["id"]), + "current_case_number": r["case_number"], + "proposed_bare": bare or "", + "proceeding_type": r["proceeding_type"] or "", + "citation_formatted": r["cf"], + "extract_flag": flag, + "consistency": cons, + "will_change": "yes" if changes else "no", + }) + from collections import Counter + bare_counts = Counter(d["proposed_bare"] for d in out if d["proposed_bare"]) + for d in out: + d["dup_check"] = "DUP_CHECK" if (d["proposed_bare"] and bare_counts[d["proposed_bare"]] > 1) else "" + return out + + +def _ts() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]: + AUDIT_DIR.mkdir(parents=True, exist_ok=True) + csv_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.csv" + md_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.md" + cols = ["id", "current_case_number", "proposed_bare", "proceeding_type", + "citation_formatted", "extract_flag", "consistency", "dup_check", "will_change"] + with csv_path.open("w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=cols) + w.writeheader() + w.writerows(rows) + changing = [r for r in rows if r["will_change"] == "yes"] + flagged = [r for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH" or r["dup_check"]] + with md_path.open("w", encoding="utf-8") as f: + f.write(f"# FU-2b — טבלת-תיאום מזהים (internal_committee) — {ts}\n\n") + f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n") + f.write("## דורש הכרעת-יו\"ר (flags)\n\n") + f.write("| current_case_number | proposed_bare | proc | flags |\n|---|---|---|---|\n") + for r in flagged: + fl = " ".join(x for x in [r["extract_flag"] if r["extract_flag"] != "OK" else "", + r["consistency"] if r["consistency"] == "MISMATCH" else "", + r["dup_check"]] if x) + f.write(f"| {r['current_case_number'][:50]} | {r['proposed_bare']} | {r['proceeding_type']} | {fl} |\n") + f.write("\n## כל השינויים המוצעים\n\n") + f.write("| current_case_number | → proposed_bare | proc |\n|---|---|---|\n") + for r in changing: + f.write(f"| {r['current_case_number'][:55]} | {r['proposed_bare']} | {r['proceeding_type']} |\n") + return csv_path, md_path + + +async def _apply(approved_csv: Path, ts: str) -> dict: + from legal_mcp.services import db + with approved_csv.open(encoding="utf-8") as f: + approved = [r for r in csv.DictReader(f) + if r.get("will_change") == "yes" and r.get("proposed_bare")] + if not approved: + return {"applied": 0, "note": "no approved changing rows"} + AUDIT_DIR.mkdir(parents=True, exist_ok=True) + backup = AUDIT_DIR / f"fu2b-backup-{ts}.csv" + pool = await db.get_pool() + applied = 0 + with backup.open("w", newline="", encoding="utf-8") as bf: + bw = csv.writer(bf) + bw.writerow(["id", "old_case_number"]) + async with pool.acquire() as conn: + for r in approved: + old = await conn.fetchval("SELECT case_number FROM case_law WHERE id=$1", r["id"]) + if old is None: + continue + bw.writerow([r["id"], old]) + await conn.execute( + "UPDATE case_law SET case_number=$2 WHERE id=$1 " + "AND source_kind='internal_committee'", + r["id"], r["proposed_bare"]) + applied += 1 + return {"applied": applied, "backup": str(backup)} + + +async def main() -> int: + parser = argparse.ArgumentParser(description="FU-2b internal case_number reconciliation") + parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)") + parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)") + args = parser.parse_args() + ts = _ts() + + if not args.apply: + rows = await _build_reconciliation() + csv_path, md_path = _write_table(rows, ts) + changing = sum(1 for r in rows if r["will_change"] == "yes") + flagged = sum(1 for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH" or r["dup_check"]) + print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}") + print(f" table: {md_path}") + print(f" csv: {csv_path}") + print("Review the table with the chair, then run --apply --approved .") + return 0 + + if not args.approved: + print("ERROR: --apply requires --approved (the chair-reviewed table).", file=sys.stderr) + return 2 + result = await _apply(Path(args.approved), ts) + print(f"APPLIED: {result}") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main()))