#!/usr/bin/env python3 """FU-2c — reconcile external case_law case_number → canonical designator+docket. External court precedents stored the FULL citation (proceeding designator + docket + district qualifier + parties + Nevo date) inside case_number. The canonical form (chair decision 2026-05-31, Option A; consistent with db.py:369 comment "עע\"מ 3975/22" and INV-ID2/X1) is **proceeding-designator + docket only**, with court-docket '/' PRESERVED (court convention, NOT normalized to '-'). Parties / court-name / district-qualifier / Nevo-date live in citation_formatted (the display field), never in the identifier. Current : עע"מ (מינהליים ת"א) 14306-09-23 עדינה בולקינד נ' הוועדה... (נבו 11.2.2024) Target : עמ"נ 14306-09-23 (designator + docket; '/' kept where present) DETERMINISTIC — no LLM. The docket is the single docket-shaped token; the designator is the leading proceeding token. 0 dockets, >1 distinct dockets, a citation whose docket disagrees with case_number, or an empty citation are FLAGGED for chair review, never guessed/auto-applied. Scope: source_kind <> 'internal_committee' (external_upload / cited_only / nevo_seed) — the external partial-unique space (X1 §2, uq_case_law_external_number). internal_committee was handled by FU-2b. FK-safe: all case_law FKs reference case_law.id (UUID), not case_number. Usage (must use the mcp-server venv — asyncpg/pgvector vendored there): PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python # Dry-run (default): builds the reconciliation table for chair review. $PY scripts/fu2c_reconcile_external_case_numbers.py # Apply ONLY chair-approved rows (after Dafna's review), backup first: $PY scripts/fu2c_reconcile_external_case_numbers.py --apply \ --approved data/audit/fu2c-reconciliation-.csv """ from __future__ import annotations import argparse import asyncio import csv import os import re import sys from collections import Counter from datetime import datetime, timezone from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src")) if "POSTGRES_URL" not in os.environ: os.environ["POSTGRES_URL"] = ( f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:" f"{os.environ.get('POSTGRES_PASSWORD','')}@" f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:" f"{os.environ.get('POSTGRES_PORT','5433')}/" f"{os.environ.get('POSTGRES_DB','legal_ai')}" ) AUDIT_DIR = REPO_ROOT / "data" / "audit" # Docket shapes: district-court admin (NNNNN-NN-NN) or classic court (NNNN/YY). _DOCKET_RE = re.compile(r"\d{3,6}-\d{1,2}-\d{2}|\d{1,6}/\d{2}") # Layout / RTL-LTR / bracket marks that wrap citations from OCR + LLM output. _MARKS = dict.fromkeys(map(ord, "‎‏‪‫‬‭‮"), None) # Gershayim variants: ASCII " and Hebrew punctuation ״ (U+05F4), ' and ׳ (U+05F3). def _clean(s: str) -> str: """Drop RTL/LTR marks and the ‏(‏ ‏)‏ wrappers OCR sprays around qualifiers.""" return (s or "").translate(_MARKS).replace("‏", "").strip() def _designator_eq(a: str, b: str) -> bool: """בר\"מ ≡ בר\"ם (final-mem variant); normalize gershayim before compare.""" norm = lambda x: (x or "").replace("״", '"').replace("׳", "'").replace("ם", "מ").strip() return norm(a) == norm(b) def _extract(case_number: str) -> tuple[str | None, str | None, str]: """Return (designator, docket, flag). flag ∈ {OK, NO_DOCKET, MULTI_DOCKET}. designator = leading proceeding token (בג"ץ / עע"מ / בר"מ / עמ"נ / עת"מ / ע"א …). docket = the single docket-shaped token ('/' preserved per chair decision). Canonical = f"{designator} {docket}". 0 or >1 distinct dockets → flag (chair). """ cn = _clean(case_number) dockets = _DOCKET_RE.findall(cn) distinct = list(dict.fromkeys(dockets)) if not distinct: return None, None, "NO_DOCKET" docket = distinct[0] m = _DOCKET_RE.search(cn) prefix = cn[: m.start()].strip() # designator is the first whitespace token of the prefix (a parenthesised # district qualifier, if any, comes after it and is dropped). designator = prefix.split()[0] if prefix.split() else "" flag = "OK" if len(distinct) == 1 else "MULTI_DOCKET" return designator or None, docket, flag def _citation_docket(citation_formatted: str) -> str | None: """First docket-shaped token inside the formatted citation, if any.""" m = _DOCKET_RE.search(_clean(citation_formatted)) return m.group() if m else None def _consistency(docket: str | None, citation_formatted: str) -> str: """OK if case_number docket matches citation docket; MISMATCH if they differ; NO_CITATION if citation empty; CIT_NO_DOCKET if citation has no docket token.""" if not _clean(citation_formatted): return "NO_CITATION" if not docket: return "NO_DOCKET" cd = _citation_docket(citation_formatted) if cd is None: return "CIT_NO_DOCKET" return "OK" if cd == docket else "MISMATCH" async def _build_reconciliation() -> list[dict]: from legal_mcp.services import db pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, case_number, source_kind, coalesce(court,'') AS court, " "coalesce(citation_formatted,'') AS cf " "FROM case_law WHERE source_kind <> 'internal_committee' " "ORDER BY source_kind, case_number") out: list[dict] = [] for r in rows: designator, docket, flag = _extract(r["case_number"]) canonical = f"{designator} {docket}" if designator and docket else (docket or "") cons = _consistency(docket, r["cf"]) cd = _citation_docket(r["cf"]) cit_desig = _clean(r["cf"]).split()[0] if _clean(r["cf"]) else "" desig_flag = "" if designator and cit_desig and cd == docket and not _designator_eq(designator, cit_desig): desig_flag = "DESIG_MISMATCH" changes = bool(canonical) and canonical != _clean(r["case_number"]) out.append({ "id": str(r["id"]), "source_kind": r["source_kind"], "current_case_number": r["case_number"], "proposed_canonical": canonical, "court": r["court"], "citation_formatted": r["cf"], "extract_flag": flag, "consistency": cons, "desig_flag": desig_flag, "will_change": "yes" if changes else "no", }) canon_counts = Counter(d["proposed_canonical"] for d in out if d["proposed_canonical"]) for d in out: d["dup_check"] = "DUP_CHECK" if (d["proposed_canonical"] and canon_counts[d["proposed_canonical"]] > 1) else "" return out def _ts() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") # BLOCKING flags forbid auto-apply (identity is uncertain/corrupt — chair must adjudicate). # ADVISORY flags are surfaced for review but do NOT block the case_number fix, because the # docket extraction is still deterministic and unambiguous (e.g. NO_CITATION = the display # citation is missing, an orthogonal backfill gap — it does not make the docket wrong). def _is_blocking(r: dict) -> bool: return bool(r["extract_flag"] != "OK" or r["consistency"] in {"MISMATCH", "CIT_NO_DOCKET"} or r["desig_flag"] or r["dup_check"]) def _is_flagged(r: dict) -> bool: """Anything worth showing the chair (blocking + advisory NO_CITATION).""" return _is_blocking(r) or r["consistency"] == "NO_CITATION" def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]: AUDIT_DIR.mkdir(parents=True, exist_ok=True) csv_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.csv" md_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.md" cols = ["id", "source_kind", "current_case_number", "proposed_canonical", "court", "citation_formatted", "extract_flag", "consistency", "desig_flag", "dup_check", "will_change"] with csv_path.open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() w.writerows(rows) changing = [r for r in rows if r["will_change"] == "yes"] flagged = [r for r in rows if _is_flagged(r)] with md_path.open("w", encoding="utf-8") as f: f.write(f"# FU-2c — טבלת-תיאום מזהים חיצוניים (case_law non-internal) — {ts}\n\n") f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n") f.write("## דורש הכרעת-יו\"ר (flags)\n\n") f.write("BLOCK = חוסם auto-apply (זהות לא-ודאית); ADVISORY = תיקון case_number בטוח, פער נלווה.\n\n") f.write("| current_case_number | proposed_canonical | flags | gate |\n|---|---|---|---|\n") for r in flagged: fl = " ".join(x for x in [ r["extract_flag"] if r["extract_flag"] != "OK" else "", r["consistency"] if r["consistency"] in {"MISMATCH", "NO_CITATION", "CIT_NO_DOCKET"} else "", r["desig_flag"], r["dup_check"]] if x) gate = "BLOCK" if _is_blocking(r) else "ADVISORY" f.write(f"| {r['current_case_number'][:55]} | {r['proposed_canonical']} | {fl} | {gate} |\n") f.write("\n## שינויים שיוחלו ב-apply (will_change=yes, לא-חוסם — כולל ADVISORY)\n\n") f.write("| current_case_number | → proposed_canonical |\n|---|---|\n") for r in changing: if _is_blocking(r): continue f.write(f"| {r['current_case_number'][:60]} | {r['proposed_canonical']} |\n") return csv_path, md_path def _load_overrides(overrides_csv: Path | None) -> dict[str, dict]: """Chair per-record adjudication of BLOCKING rows. id → {canonical, citation, reason}. Columns: id, proposed_canonical, reason (required); citation_formatted (optional — when present, the record's display citation is reconciled too). Each row is an explicit, audited chair decision that unblocks one record (e.g. a consolidated judgment whose lead docket the deterministic extractor cannot choose on its own).""" if overrides_csv is None: return {} out: dict[str, dict] = {} with overrides_csv.open(encoding="utf-8") as f: for r in csv.DictReader(f): cid, canon = r.get("id"), (r.get("proposed_canonical") or "").strip() if cid and canon: out[cid] = { "canonical": canon, "citation": (r.get("citation_formatted") or "").strip(), "reason": (r.get("reason") or "").strip(), } return out async def _apply(approved_csv: Path, overrides_csv: Path | None, ts: str) -> dict: from legal_mcp.services import db overrides = _load_overrides(overrides_csv) with approved_csv.open(encoding="utf-8") as f: all_rows = [r for r in csv.DictReader(f) if r.get("will_change") == "yes"] # Decide the target per row. SCOPE: apply only to source_kind='external_upload' # (the reviewed FU-2c target, task #68) OR an explicit chair override. cited_only / # nevo_seed stay in the reconciliation VIEW (so DUP_CHECK spans the full external # unique space) but are NOT migrated here — they are a separate, unreviewed category. # Per row: non-blocking → proposed_canonical (NO_CITATION is advisory, still safe); # blocking → only via override. An override may also carry citation_formatted. plan: list[dict] = [] # {id, canonical, citation|None, source} skipped_blocking: list[str] = [] skipped_out_of_scope = 0 for r in all_rows: in_scope = r.get("source_kind") == "external_upload" or r["id"] in overrides if not in_scope: skipped_out_of_scope += 1 continue blocking = _is_blocking({ "extract_flag": r.get("extract_flag", "OK"), "consistency": r.get("consistency", ""), "desig_flag": r.get("desig_flag", ""), "dup_check": r.get("dup_check", ""), }) if r["id"] in overrides: ov = overrides[r["id"]] plan.append({"id": r["id"], "canonical": ov["canonical"], "citation": ov["citation"] or None, "source": f"override:{ov['reason']}"}) elif not blocking and r.get("proposed_canonical"): plan.append({"id": r["id"], "canonical": r["proposed_canonical"], "citation": None, "source": "auto"}) elif blocking: skipped_blocking.append(r["id"]) if not plan: return {"applied": 0, "note": "no applicable rows", "skipped_blocking": skipped_blocking, "skipped_out_of_scope": skipped_out_of_scope} AUDIT_DIR.mkdir(parents=True, exist_ok=True) backup = AUDIT_DIR / f"fu2c-backup-{ts}.csv" pool = await db.get_pool() applied = 0 cit_applied = 0 collisions: list[str] = [] with backup.open("w", newline="", encoding="utf-8") as bf: bw = csv.writer(bf) bw.writerow(["id", "old_case_number", "new_case_number", "old_citation", "new_citation", "source"]) async with pool.acquire() as conn: for p in plan: rec = await conn.fetchrow( "SELECT case_number, coalesce(citation_formatted,'') AS cf FROM case_law WHERE id=$1", p["id"]) if rec is None: continue # Pre-flight collision guard: the external unique index spans ALL # source_kind<>'internal_committee'. Skip if another row already holds # the target value, rather than letting the UPDATE raise (e.g. a cited_only # reference that pre-existed the uploaded precedent → needs dedup, not migrate). if p["canonical"] != rec["case_number"]: clash = await conn.fetchval( "SELECT id FROM case_law WHERE case_number=$1 " "AND source_kind <> 'internal_committee' AND id <> $2", p["canonical"], p["id"]) if clash is not None: collisions.append(f"{p['id']}→{p['canonical']} (clash {clash})") continue new_cit = p["citation"] if p["citation"] is not None else rec["cf"] bw.writerow([p["id"], rec["case_number"], p["canonical"], rec["cf"], new_cit, p["source"]]) if p["citation"] is not None: await conn.execute( "UPDATE case_law SET case_number=$2, citation_formatted=$3 WHERE id=$1 " "AND source_kind <> 'internal_committee'", p["id"], p["canonical"], p["citation"]) cit_applied += 1 else: await conn.execute( "UPDATE case_law SET case_number=$2 WHERE id=$1 " "AND source_kind <> 'internal_committee'", p["id"], p["canonical"]) applied += 1 return {"applied": applied, "citations_fixed": cit_applied, "overrides": len(overrides), "skipped_blocking": skipped_blocking, "skipped_out_of_scope": skipped_out_of_scope, "collisions": collisions, "backup": str(backup)} async def main() -> int: parser = argparse.ArgumentParser(description="FU-2c external case_number reconciliation") parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)") parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)") parser.add_argument("--overrides", type=str, help="optional CSV (id,proposed_canonical,reason) of " "chair-adjudicated BLOCKING rows to unblock (e.g. consolidated-judgment lead docket)") args = parser.parse_args() ts = _ts() if not args.apply: rows = await _build_reconciliation() csv_path, md_path = _write_table(rows, ts) changing = sum(1 for r in rows if r["will_change"] == "yes") flagged = sum(1 for r in rows if _is_flagged(r)) print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}") print(f" table: {md_path}") print(f" csv: {csv_path}") print("Review the table with the chair, then run --apply --approved .") return 0 if not args.approved: print("ERROR: --apply requires --approved (the chair-reviewed table).", file=sys.stderr) return 2 result = await _apply(Path(args.approved), Path(args.overrides) if args.overrides else None, ts) print(f"APPLIED: {result}") return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))