Files
legal-ai/scripts/fu2c_reconcile_external_case_numbers.py
Chaim 4fce9d503f feat(migration): FU-2c — reconcile external case_law identifiers (GAP-08, #68)
External court precedents stored the full citation (designator + docket +
parties + Nevo date) inside case_number, violating INV-ID2/G1 (citation as
identifier). Chair decision 2026-05-31 (Option A): canonical external
case_number = proceeding-designator + docket, '/' preserved (court
convention, not X1's '/'→'-'); parties/court/date → citation_formatted.

scripts/fu2c_reconcile_external_case_numbers.py — deterministic dry-run →
chair-review → apply, mirroring FU-2b:
- extracts designator+docket; flags split into BLOCKING (MISMATCH /
  CIT_NO_DOCKET / DESIG_MISMATCH / DUP_CHECK / NO_DOCKET) vs ADVISORY
  (NO_CITATION — case_number fix still deterministic, missing citation is a
  separate gap), so advisory rows apply while uncertain identity does not.
- --overrides CSV (id,proposed_canonical,citation_formatted,reason) for
  audited chair adjudication of blocking rows.
- apply scoped to source_kind='external_upload' (task target) while keeping
  cited_only/nevo_seed in the reconciliation VIEW so DUP_CHECK spans the full
  external unique space; pre-flight collision guard before every UPDATE.

Applied to production 2026-05-31: 21 case_number normalized + 3
citation_formatted reconciled (D = consolidated Supreme Court judgment
לויתן/קלמנוביץ → lead docket 25226-04-25; 2×C empty citations composed from
metadata). אהוד שפר עע"מ 317/10 deferred — cross-source duplicate with an
existing cited_only reference (collision guard held; → #70). 49 cited_only
records out of scope → new task #70 (committee-form NNNN-NN dockets the
extractor misses, dedup, unresolvable "ערר אדלר"). Extraction + gating
verified offline on all 24 records.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 14:12:45 +00:00

341 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""FU-2c — reconcile external case_law case_number → canonical designator+docket.
External court precedents stored the FULL citation (proceeding designator +
docket + district qualifier + parties + Nevo date) inside case_number. The
canonical form (chair decision 2026-05-31, Option A; consistent with db.py:369
comment "עע\"מ 3975/22" and INV-ID2/X1) is **proceeding-designator + docket
only**, with court-docket '/' PRESERVED (court convention, NOT normalized to
'-'). Parties / court-name / district-qualifier / Nevo-date live in
citation_formatted (the display field), never in the identifier.
Current : עע"מ (מינהליים ת"א) 14306-09-23 עדינה בולקינד נ' הוועדה... (נבו 11.2.2024)
Target : עמ"נ 14306-09-23 (designator + docket; '/' kept where present)
DETERMINISTIC — no LLM. The docket is the single docket-shaped token; the
designator is the leading proceeding token. 0 dockets, >1 distinct dockets, a
citation whose docket disagrees with case_number, or an empty citation are
FLAGGED for chair review, never guessed/auto-applied.
Scope: source_kind <> 'internal_committee' (external_upload / cited_only /
nevo_seed) — the external partial-unique space (X1 §2, uq_case_law_external_number).
internal_committee was handled by FU-2b. FK-safe: all case_law FKs reference
case_law.id (UUID), not case_number.
Usage (must use the mcp-server venv — asyncpg/pgvector vendored there):
PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python
# Dry-run (default): builds the reconciliation table for chair review.
$PY scripts/fu2c_reconcile_external_case_numbers.py
# Apply ONLY chair-approved rows (after Dafna's review), backup first:
$PY scripts/fu2c_reconcile_external_case_numbers.py --apply \
--approved data/audit/fu2c-reconciliation-<ts>.csv
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import os
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src"))
if "POSTGRES_URL" not in os.environ:
os.environ["POSTGRES_URL"] = (
f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:"
f"{os.environ.get('POSTGRES_PASSWORD','')}@"
f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:"
f"{os.environ.get('POSTGRES_PORT','5433')}/"
f"{os.environ.get('POSTGRES_DB','legal_ai')}"
)
AUDIT_DIR = REPO_ROOT / "data" / "audit"
# Docket shapes: district-court admin (NNNNN-NN-NN) or classic court (NNNN/YY).
_DOCKET_RE = re.compile(r"\d{3,6}-\d{1,2}-\d{2}|\d{1,6}/\d{2}")
# Layout / RTL-LTR / bracket marks that wrap citations from OCR + LLM output.
_MARKS = dict.fromkeys(map(ord, ""), None)
# Gershayim variants: ASCII " and Hebrew punctuation ״ (U+05F4), ' and ׳ (U+05F3).
def _clean(s: str) -> str:
"""Drop RTL/LTR marks and the ( ) wrappers OCR sprays around qualifiers."""
return (s or "").translate(_MARKS).replace("", "").strip()
def _designator_eq(a: str, b: str) -> bool:
"""בר\"מ ≡ בר\"ם (final-mem variant); normalize gershayim before compare."""
norm = lambda x: (x or "").replace("״", '"').replace("׳", "'").replace("ם", "מ").strip()
return norm(a) == norm(b)
def _extract(case_number: str) -> tuple[str | None, str | None, str]:
"""Return (designator, docket, flag). flag ∈ {OK, NO_DOCKET, MULTI_DOCKET}.
designator = leading proceeding token (בג"ץ / עע"מ / בר"מ / עמ"נ / עת"מ / ע"א …).
docket = the single docket-shaped token ('/' preserved per chair decision).
Canonical = f"{designator} {docket}". 0 or >1 distinct dockets → flag (chair).
"""
cn = _clean(case_number)
dockets = _DOCKET_RE.findall(cn)
distinct = list(dict.fromkeys(dockets))
if not distinct:
return None, None, "NO_DOCKET"
docket = distinct[0]
m = _DOCKET_RE.search(cn)
prefix = cn[: m.start()].strip()
# designator is the first whitespace token of the prefix (a parenthesised
# district qualifier, if any, comes after it and is dropped).
designator = prefix.split()[0] if prefix.split() else ""
flag = "OK" if len(distinct) == 1 else "MULTI_DOCKET"
return designator or None, docket, flag
def _citation_docket(citation_formatted: str) -> str | None:
"""First docket-shaped token inside the formatted citation, if any."""
m = _DOCKET_RE.search(_clean(citation_formatted))
return m.group() if m else None
def _consistency(docket: str | None, citation_formatted: str) -> str:
"""OK if case_number docket matches citation docket; MISMATCH if they differ;
NO_CITATION if citation empty; CIT_NO_DOCKET if citation has no docket token."""
if not _clean(citation_formatted):
return "NO_CITATION"
if not docket:
return "NO_DOCKET"
cd = _citation_docket(citation_formatted)
if cd is None:
return "CIT_NO_DOCKET"
return "OK" if cd == docket else "MISMATCH"
async def _build_reconciliation() -> list[dict]:
from legal_mcp.services import db
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, case_number, source_kind, coalesce(court,'') AS court, "
"coalesce(citation_formatted,'') AS cf "
"FROM case_law WHERE source_kind <> 'internal_committee' "
"ORDER BY source_kind, case_number")
out: list[dict] = []
for r in rows:
designator, docket, flag = _extract(r["case_number"])
canonical = f"{designator} {docket}" if designator and docket else (docket or "")
cons = _consistency(docket, r["cf"])
cd = _citation_docket(r["cf"])
cit_desig = _clean(r["cf"]).split()[0] if _clean(r["cf"]) else ""
desig_flag = ""
if designator and cit_desig and cd == docket and not _designator_eq(designator, cit_desig):
desig_flag = "DESIG_MISMATCH"
changes = bool(canonical) and canonical != _clean(r["case_number"])
out.append({
"id": str(r["id"]),
"source_kind": r["source_kind"],
"current_case_number": r["case_number"],
"proposed_canonical": canonical,
"court": r["court"],
"citation_formatted": r["cf"],
"extract_flag": flag,
"consistency": cons,
"desig_flag": desig_flag,
"will_change": "yes" if changes else "no",
})
canon_counts = Counter(d["proposed_canonical"] for d in out if d["proposed_canonical"])
for d in out:
d["dup_check"] = "DUP_CHECK" if (d["proposed_canonical"] and canon_counts[d["proposed_canonical"]] > 1) else ""
return out
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# BLOCKING flags forbid auto-apply (identity is uncertain/corrupt — chair must adjudicate).
# ADVISORY flags are surfaced for review but do NOT block the case_number fix, because the
# docket extraction is still deterministic and unambiguous (e.g. NO_CITATION = the display
# citation is missing, an orthogonal backfill gap — it does not make the docket wrong).
def _is_blocking(r: dict) -> bool:
return bool(r["extract_flag"] != "OK" or r["consistency"] in {"MISMATCH", "CIT_NO_DOCKET"}
or r["desig_flag"] or r["dup_check"])
def _is_flagged(r: dict) -> bool:
"""Anything worth showing the chair (blocking + advisory NO_CITATION)."""
return _is_blocking(r) or r["consistency"] == "NO_CITATION"
def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]:
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
csv_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.csv"
md_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.md"
cols = ["id", "source_kind", "current_case_number", "proposed_canonical", "court",
"citation_formatted", "extract_flag", "consistency", "desig_flag", "dup_check", "will_change"]
with csv_path.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
w.writerows(rows)
changing = [r for r in rows if r["will_change"] == "yes"]
flagged = [r for r in rows if _is_flagged(r)]
with md_path.open("w", encoding="utf-8") as f:
f.write(f"# FU-2c — טבלת-תיאום מזהים חיצוניים (case_law non-internal) — {ts}\n\n")
f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n")
f.write("## דורש הכרעת-יו\"ר (flags)\n\n")
f.write("BLOCK = חוסם auto-apply (זהות לא-ודאית); ADVISORY = תיקון case_number בטוח, פער נלווה.\n\n")
f.write("| current_case_number | proposed_canonical | flags | gate |\n|---|---|---|---|\n")
for r in flagged:
fl = " ".join(x for x in [
r["extract_flag"] if r["extract_flag"] != "OK" else "",
r["consistency"] if r["consistency"] in {"MISMATCH", "NO_CITATION", "CIT_NO_DOCKET"} else "",
r["desig_flag"], r["dup_check"]] if x)
gate = "BLOCK" if _is_blocking(r) else "ADVISORY"
f.write(f"| {r['current_case_number'][:55]} | {r['proposed_canonical']} | {fl} | {gate} |\n")
f.write("\n## שינויים שיוחלו ב-apply (will_change=yes, לא-חוסם — כולל ADVISORY)\n\n")
f.write("| current_case_number | → proposed_canonical |\n|---|---|\n")
for r in changing:
if _is_blocking(r):
continue
f.write(f"| {r['current_case_number'][:60]} | {r['proposed_canonical']} |\n")
return csv_path, md_path
def _load_overrides(overrides_csv: Path | None) -> dict[str, dict]:
"""Chair per-record adjudication of BLOCKING rows. id → {canonical, citation, reason}.
Columns: id, proposed_canonical, reason (required); citation_formatted (optional —
when present, the record's display citation is reconciled too). Each row is an
explicit, audited chair decision that unblocks one record (e.g. a consolidated
judgment whose lead docket the deterministic extractor cannot choose on its own)."""
if overrides_csv is None:
return {}
out: dict[str, dict] = {}
with overrides_csv.open(encoding="utf-8") as f:
for r in csv.DictReader(f):
cid, canon = r.get("id"), (r.get("proposed_canonical") or "").strip()
if cid and canon:
out[cid] = {
"canonical": canon,
"citation": (r.get("citation_formatted") or "").strip(),
"reason": (r.get("reason") or "").strip(),
}
return out
async def _apply(approved_csv: Path, overrides_csv: Path | None, ts: str) -> dict:
from legal_mcp.services import db
overrides = _load_overrides(overrides_csv)
with approved_csv.open(encoding="utf-8") as f:
all_rows = [r for r in csv.DictReader(f) if r.get("will_change") == "yes"]
# Decide the target per row. SCOPE: apply only to source_kind='external_upload'
# (the reviewed FU-2c target, task #68) OR an explicit chair override. cited_only /
# nevo_seed stay in the reconciliation VIEW (so DUP_CHECK spans the full external
# unique space) but are NOT migrated here — they are a separate, unreviewed category.
# Per row: non-blocking → proposed_canonical (NO_CITATION is advisory, still safe);
# blocking → only via override. An override may also carry citation_formatted.
plan: list[dict] = [] # {id, canonical, citation|None, source}
skipped_blocking: list[str] = []
skipped_out_of_scope = 0
for r in all_rows:
in_scope = r.get("source_kind") == "external_upload" or r["id"] in overrides
if not in_scope:
skipped_out_of_scope += 1
continue
blocking = _is_blocking({
"extract_flag": r.get("extract_flag", "OK"), "consistency": r.get("consistency", ""),
"desig_flag": r.get("desig_flag", ""), "dup_check": r.get("dup_check", ""),
})
if r["id"] in overrides:
ov = overrides[r["id"]]
plan.append({"id": r["id"], "canonical": ov["canonical"],
"citation": ov["citation"] or None, "source": f"override:{ov['reason']}"})
elif not blocking and r.get("proposed_canonical"):
plan.append({"id": r["id"], "canonical": r["proposed_canonical"], "citation": None, "source": "auto"})
elif blocking:
skipped_blocking.append(r["id"])
if not plan:
return {"applied": 0, "note": "no applicable rows", "skipped_blocking": skipped_blocking,
"skipped_out_of_scope": skipped_out_of_scope}
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
backup = AUDIT_DIR / f"fu2c-backup-{ts}.csv"
pool = await db.get_pool()
applied = 0
cit_applied = 0
collisions: list[str] = []
with backup.open("w", newline="", encoding="utf-8") as bf:
bw = csv.writer(bf)
bw.writerow(["id", "old_case_number", "new_case_number", "old_citation", "new_citation", "source"])
async with pool.acquire() as conn:
for p in plan:
rec = await conn.fetchrow(
"SELECT case_number, coalesce(citation_formatted,'') AS cf FROM case_law WHERE id=$1", p["id"])
if rec is None:
continue
# Pre-flight collision guard: the external unique index spans ALL
# source_kind<>'internal_committee'. Skip if another row already holds
# the target value, rather than letting the UPDATE raise (e.g. a cited_only
# reference that pre-existed the uploaded precedent → needs dedup, not migrate).
if p["canonical"] != rec["case_number"]:
clash = await conn.fetchval(
"SELECT id FROM case_law WHERE case_number=$1 "
"AND source_kind <> 'internal_committee' AND id <> $2", p["canonical"], p["id"])
if clash is not None:
collisions.append(f"{p['id']}{p['canonical']} (clash {clash})")
continue
new_cit = p["citation"] if p["citation"] is not None else rec["cf"]
bw.writerow([p["id"], rec["case_number"], p["canonical"], rec["cf"], new_cit, p["source"]])
if p["citation"] is not None:
await conn.execute(
"UPDATE case_law SET case_number=$2, citation_formatted=$3 WHERE id=$1 "
"AND source_kind <> 'internal_committee'",
p["id"], p["canonical"], p["citation"])
cit_applied += 1
else:
await conn.execute(
"UPDATE case_law SET case_number=$2 WHERE id=$1 "
"AND source_kind <> 'internal_committee'",
p["id"], p["canonical"])
applied += 1
return {"applied": applied, "citations_fixed": cit_applied, "overrides": len(overrides),
"skipped_blocking": skipped_blocking, "skipped_out_of_scope": skipped_out_of_scope,
"collisions": collisions, "backup": str(backup)}
async def main() -> int:
parser = argparse.ArgumentParser(description="FU-2c external case_number reconciliation")
parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)")
parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)")
parser.add_argument("--overrides", type=str, help="optional CSV (id,proposed_canonical,reason) of "
"chair-adjudicated BLOCKING rows to unblock (e.g. consolidated-judgment lead docket)")
args = parser.parse_args()
ts = _ts()
if not args.apply:
rows = await _build_reconciliation()
csv_path, md_path = _write_table(rows, ts)
changing = sum(1 for r in rows if r["will_change"] == "yes")
flagged = sum(1 for r in rows if _is_flagged(r))
print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}")
print(f" table: {md_path}")
print(f" csv: {csv_path}")
print("Review the table with the chair, then run --apply --approved <reviewed.csv>.")
return 0
if not args.approved:
print("ERROR: --apply requires --approved <csv> (the chair-reviewed table).", file=sys.stderr)
return 2
result = await _apply(Path(args.approved), Path(args.overrides) if args.overrides else None, ts)
print(f"APPLIED: {result}")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))