Files
legal-ai/scripts/fu2b_reconcile_internal_case_numbers.py
Chaim e46868feda feat(fu2b): flag PROC_MISMATCH (case_number prefix vs proceeding_type) for chair
Dry-run surfaced 2 rows with בל"מ prefix but proceeding_type=ערר. Since the
migration strips the prefix, a wrong proceeding_type would silently lose the
בל"מ signal — must be chair-adjudicated, not auto-applied. Chair table now
flags 4 rows: 2 DUP_CHECK (8047-23) + 2 PROC_MISMATCH.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 08:57:42 +00:00

215 lines
9.3 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""FU-2b — reconcile internal_committee case_number → canonical bare number.
Rewrites case_number values that currently hold a full citation into the
canonical normalized bare number (X1: trim · prefix-strip · '/''-', month
preserved). citation_formatted is the display field and is left untouched.
DETERMINISTIC — no LLM. Extraction takes the single case-number-shaped token
from the value; 0 or >1 tokens are flagged for chair review, never guessed.
Usage (must use the mcp-server venv — asyncpg/pgvector vendored there):
PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python
# Dry-run (default): builds the reconciliation table for chair review.
$PY scripts/fu2b_reconcile_internal_case_numbers.py
# Apply ONLY the chair-approved rows (after Dafna's review), backup first:
$PY scripts/fu2b_reconcile_internal_case_numbers.py --apply \
--approved data/audit/fu2b-approved-<ts>.csv
Scope: source_kind='internal_committee' only (external → #68/FU-2c). FK-safe:
all case_law FKs reference case_law.id (UUID), not case_number.
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src"))
if "POSTGRES_URL" not in os.environ:
os.environ["POSTGRES_URL"] = (
f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:"
f"{os.environ.get('POSTGRES_PASSWORD','')}@"
f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:"
f"{os.environ.get('POSTGRES_PORT','5433')}/"
f"{os.environ.get('POSTGRES_DB','legal_ai')}"
)
AUDIT_DIR = REPO_ROOT / "data" / "audit"
_TOKEN_RE = re.compile(r"[0-9]{2,6}(?:[-/][0-9]{1,2}){1,2}")
def _extract_bare(case_number: str) -> tuple[str | None, str]:
"""Return (canonical_bare, flag). flag ∈ {OK, NO_NUMBER, MULTI_NUMBER}.
Deterministic: finds case-number-shaped tokens (NNNN/YY or NNNN-MM-YY).
Exactly one → normalize '/''-' (month preserved, none invented). 0 or >1
→ None + flag (chair decides; never guess).
"""
tokens = _TOKEN_RE.findall(case_number or "")
if len(tokens) == 1:
return tokens[0].replace("/", "-"), "OK"
if not tokens:
return None, "NO_NUMBER"
return None, "MULTI_NUMBER"
def _consistency_flag(bare: str | None, citation_formatted: str) -> str:
"""OK if bare appears in citation_formatted; MISMATCH if not; NO_CITATION if empty."""
if not citation_formatted:
return "NO_CITATION"
if not bare:
return "NO_NUMBER"
cf = citation_formatted.replace("/", "-")
return "OK" if bare in cf else "MISMATCH"
def _proc_mismatch(case_number: str, proceeding_type: str) -> bool:
"""True if the case_number's leading proceeding prefix disagrees with proceeding_type.
The migration strips the prefix from case_number, so a בל"מ prefix paired with
proceeding_type='ערר' (or vice-versa) would SILENTLY LOSE the proceeding signal.
Such rows must be flagged for chair adjudication, never auto-applied.
"""
cn = (case_number or "").lstrip().lstrip("") # drop RTL/LTR marks
pt = (proceeding_type or "").strip()
starts_balam = cn.startswith('בל"מ') or cn.startswith("בל”מ")
starts_arar = cn.startswith("ערר")
if starts_balam and pt and pt != 'בל"מ':
return True
if starts_arar and pt and pt != "ערר":
return True
return False
async def _build_reconciliation() -> list[dict]:
from legal_mcp.services import db
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, case_number, proceeding_type, coalesce(citation_formatted,'') AS cf "
"FROM case_law WHERE source_kind='internal_committee' ORDER BY case_number")
out: list[dict] = []
for r in rows:
bare, flag = _extract_bare(r["case_number"])
cons = _consistency_flag(bare, r["cf"])
changes = bare is not None and bare != r["case_number"]
out.append({
"id": str(r["id"]),
"current_case_number": r["case_number"],
"proposed_bare": bare or "",
"proceeding_type": r["proceeding_type"] or "",
"citation_formatted": r["cf"],
"extract_flag": flag,
"consistency": cons,
"proc_flag": "PROC_MISMATCH" if _proc_mismatch(r["case_number"], r["proceeding_type"] or "") else "",
"will_change": "yes" if changes else "no",
})
from collections import Counter
bare_counts = Counter(d["proposed_bare"] for d in out if d["proposed_bare"])
for d in out:
d["dup_check"] = "DUP_CHECK" if (d["proposed_bare"] and bare_counts[d["proposed_bare"]] > 1) else ""
return out
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]:
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
csv_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.csv"
md_path = AUDIT_DIR / f"fu2b-reconciliation-{ts}.md"
cols = ["id", "current_case_number", "proposed_bare", "proceeding_type",
"citation_formatted", "extract_flag", "consistency", "proc_flag", "dup_check", "will_change"]
with csv_path.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
w.writerows(rows)
changing = [r for r in rows if r["will_change"] == "yes"]
flagged = [r for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH"
or r["dup_check"] or r["proc_flag"]]
with md_path.open("w", encoding="utf-8") as f:
f.write(f"# FU-2b — טבלת-תיאום מזהים (internal_committee) — {ts}\n\n")
f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n")
f.write("## דורש הכרעת-יו\"ר (flags)\n\n")
f.write("| current_case_number | proposed_bare | proc | flags |\n|---|---|---|---|\n")
for r in flagged:
fl = " ".join(x for x in [r["extract_flag"] if r["extract_flag"] != "OK" else "",
r["consistency"] if r["consistency"] == "MISMATCH" else "",
r["proc_flag"], r["dup_check"]] if x)
f.write(f"| {r['current_case_number'][:50]} | {r['proposed_bare']} | {r['proceeding_type']} | {fl} |\n")
f.write("\n## כל השינויים המוצעים\n\n")
f.write("| current_case_number | → proposed_bare | proc |\n|---|---|---|\n")
for r in changing:
f.write(f"| {r['current_case_number'][:55]} | {r['proposed_bare']} | {r['proceeding_type']} |\n")
return csv_path, md_path
async def _apply(approved_csv: Path, ts: str) -> dict:
from legal_mcp.services import db
with approved_csv.open(encoding="utf-8") as f:
approved = [r for r in csv.DictReader(f)
if r.get("will_change") == "yes" and r.get("proposed_bare")]
if not approved:
return {"applied": 0, "note": "no approved changing rows"}
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
backup = AUDIT_DIR / f"fu2b-backup-{ts}.csv"
pool = await db.get_pool()
applied = 0
with backup.open("w", newline="", encoding="utf-8") as bf:
bw = csv.writer(bf)
bw.writerow(["id", "old_case_number"])
async with pool.acquire() as conn:
for r in approved:
old = await conn.fetchval("SELECT case_number FROM case_law WHERE id=$1", r["id"])
if old is None:
continue
bw.writerow([r["id"], old])
await conn.execute(
"UPDATE case_law SET case_number=$2 WHERE id=$1 "
"AND source_kind='internal_committee'",
r["id"], r["proposed_bare"])
applied += 1
return {"applied": applied, "backup": str(backup)}
async def main() -> int:
parser = argparse.ArgumentParser(description="FU-2b internal case_number reconciliation")
parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)")
parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)")
args = parser.parse_args()
ts = _ts()
if not args.apply:
rows = await _build_reconciliation()
csv_path, md_path = _write_table(rows, ts)
changing = sum(1 for r in rows if r["will_change"] == "yes")
flagged = sum(1 for r in rows if r["extract_flag"] != "OK" or r["consistency"] == "MISMATCH"
or r["dup_check"] or r["proc_flag"])
print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}")
print(f" table: {md_path}")
print(f" csv: {csv_path}")
print("Review the table with the chair, then run --apply --approved <reviewed.csv>.")
return 0
if not args.approved:
print("ERROR: --apply requires --approved <csv> (the chair-reviewed table).", file=sys.stderr)
return 2
result = await _apply(Path(args.approved), ts)
print(f"APPLIED: {result}")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))