feat(migration): FU-2c — reconcile external case_law identifiers (GAP-08, #68)

External court precedents stored the full citation (designator + docket +
parties + Nevo date) inside case_number, violating INV-ID2/G1 (citation as
identifier). Chair decision 2026-05-31 (Option A): canonical external
case_number = proceeding-designator + docket, '/' preserved (court
convention, not X1's '/'→'-'); parties/court/date → citation_formatted.

scripts/fu2c_reconcile_external_case_numbers.py — deterministic dry-run →
chair-review → apply, mirroring FU-2b:
- extracts designator+docket; flags split into BLOCKING (MISMATCH /
  CIT_NO_DOCKET / DESIG_MISMATCH / DUP_CHECK / NO_DOCKET) vs ADVISORY
  (NO_CITATION — case_number fix still deterministic, missing citation is a
  separate gap), so advisory rows apply while uncertain identity does not.
- --overrides CSV (id,proposed_canonical,citation_formatted,reason) for
  audited chair adjudication of blocking rows.
- apply scoped to source_kind='external_upload' (task target) while keeping
  cited_only/nevo_seed in the reconciliation VIEW so DUP_CHECK spans the full
  external unique space; pre-flight collision guard before every UPDATE.

Applied to production 2026-05-31: 21 case_number normalized + 3
citation_formatted reconciled (D = consolidated Supreme Court judgment
לויתן/קלמנוביץ → lead docket 25226-04-25; 2×C empty citations composed from
metadata). אהוד שפר עע"מ 317/10 deferred — cross-source duplicate with an
existing cited_only reference (collision guard held; → #70). 49 cited_only
records out of scope → new task #70 (committee-form NNNN-NN dockets the
extractor misses, dedup, unresolvable "ערר אדלר"). Extraction + gating
verified offline on all 24 records.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 14:12:45 +00:00
parent 9dbc1bafbf
commit 4fce9d503f
3 changed files with 366 additions and 10 deletions

View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""FU-2c — reconcile external case_law case_number → canonical designator+docket.
External court precedents stored the FULL citation (proceeding designator +
docket + district qualifier + parties + Nevo date) inside case_number. The
canonical form (chair decision 2026-05-31, Option A; consistent with db.py:369
comment "עע\"מ 3975/22" and INV-ID2/X1) is **proceeding-designator + docket
only**, with court-docket '/' PRESERVED (court convention, NOT normalized to
'-'). Parties / court-name / district-qualifier / Nevo-date live in
citation_formatted (the display field), never in the identifier.
Current : עע"מ (מינהליים ת"א) 14306-09-23 עדינה בולקינד נ' הוועדה... (נבו 11.2.2024)
Target : עמ"נ 14306-09-23 (designator + docket; '/' kept where present)
DETERMINISTIC — no LLM. The docket is the single docket-shaped token; the
designator is the leading proceeding token. 0 dockets, >1 distinct dockets, a
citation whose docket disagrees with case_number, or an empty citation are
FLAGGED for chair review, never guessed/auto-applied.
Scope: source_kind <> 'internal_committee' (external_upload / cited_only /
nevo_seed) — the external partial-unique space (X1 §2, uq_case_law_external_number).
internal_committee was handled by FU-2b. FK-safe: all case_law FKs reference
case_law.id (UUID), not case_number.
Usage (must use the mcp-server venv — asyncpg/pgvector vendored there):
PY=/home/chaim/legal-ai/mcp-server/.venv/bin/python
# Dry-run (default): builds the reconciliation table for chair review.
$PY scripts/fu2c_reconcile_external_case_numbers.py
# Apply ONLY chair-approved rows (after Dafna's review), backup first:
$PY scripts/fu2c_reconcile_external_case_numbers.py --apply \
--approved data/audit/fu2c-reconciliation-<ts>.csv
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import os
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src"))
if "POSTGRES_URL" not in os.environ:
os.environ["POSTGRES_URL"] = (
f"postgres://{os.environ.get('POSTGRES_USER','legal_ai')}:"
f"{os.environ.get('POSTGRES_PASSWORD','')}@"
f"{os.environ.get('POSTGRES_HOST','127.0.0.1')}:"
f"{os.environ.get('POSTGRES_PORT','5433')}/"
f"{os.environ.get('POSTGRES_DB','legal_ai')}"
)
AUDIT_DIR = REPO_ROOT / "data" / "audit"
# Docket shapes: district-court admin (NNNNN-NN-NN) or classic court (NNNN/YY).
_DOCKET_RE = re.compile(r"\d{3,6}-\d{1,2}-\d{2}|\d{1,6}/\d{2}")
# Layout / RTL-LTR / bracket marks that wrap citations from OCR + LLM output.
_MARKS = dict.fromkeys(map(ord, ""), None)
# Gershayim variants: ASCII " and Hebrew punctuation ״ (U+05F4), ' and ׳ (U+05F3).
def _clean(s: str) -> str:
"""Drop RTL/LTR marks and the ( ) wrappers OCR sprays around qualifiers."""
return (s or "").translate(_MARKS).replace("", "").strip()
def _designator_eq(a: str, b: str) -> bool:
"""בר\"מ ≡ בר\"ם (final-mem variant); normalize gershayim before compare."""
norm = lambda x: (x or "").replace("״", '"').replace("׳", "'").replace("ם", "מ").strip()
return norm(a) == norm(b)
def _extract(case_number: str) -> tuple[str | None, str | None, str]:
"""Return (designator, docket, flag). flag ∈ {OK, NO_DOCKET, MULTI_DOCKET}.
designator = leading proceeding token (בג"ץ / עע"מ / בר"מ / עמ"נ / עת"מ / ע"א …).
docket = the single docket-shaped token ('/' preserved per chair decision).
Canonical = f"{designator} {docket}". 0 or >1 distinct dockets → flag (chair).
"""
cn = _clean(case_number)
dockets = _DOCKET_RE.findall(cn)
distinct = list(dict.fromkeys(dockets))
if not distinct:
return None, None, "NO_DOCKET"
docket = distinct[0]
m = _DOCKET_RE.search(cn)
prefix = cn[: m.start()].strip()
# designator is the first whitespace token of the prefix (a parenthesised
# district qualifier, if any, comes after it and is dropped).
designator = prefix.split()[0] if prefix.split() else ""
flag = "OK" if len(distinct) == 1 else "MULTI_DOCKET"
return designator or None, docket, flag
def _citation_docket(citation_formatted: str) -> str | None:
"""First docket-shaped token inside the formatted citation, if any."""
m = _DOCKET_RE.search(_clean(citation_formatted))
return m.group() if m else None
def _consistency(docket: str | None, citation_formatted: str) -> str:
"""OK if case_number docket matches citation docket; MISMATCH if they differ;
NO_CITATION if citation empty; CIT_NO_DOCKET if citation has no docket token."""
if not _clean(citation_formatted):
return "NO_CITATION"
if not docket:
return "NO_DOCKET"
cd = _citation_docket(citation_formatted)
if cd is None:
return "CIT_NO_DOCKET"
return "OK" if cd == docket else "MISMATCH"
async def _build_reconciliation() -> list[dict]:
from legal_mcp.services import db
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, case_number, source_kind, coalesce(court,'') AS court, "
"coalesce(citation_formatted,'') AS cf "
"FROM case_law WHERE source_kind <> 'internal_committee' "
"ORDER BY source_kind, case_number")
out: list[dict] = []
for r in rows:
designator, docket, flag = _extract(r["case_number"])
canonical = f"{designator} {docket}" if designator and docket else (docket or "")
cons = _consistency(docket, r["cf"])
cd = _citation_docket(r["cf"])
cit_desig = _clean(r["cf"]).split()[0] if _clean(r["cf"]) else ""
desig_flag = ""
if designator and cit_desig and cd == docket and not _designator_eq(designator, cit_desig):
desig_flag = "DESIG_MISMATCH"
changes = bool(canonical) and canonical != _clean(r["case_number"])
out.append({
"id": str(r["id"]),
"source_kind": r["source_kind"],
"current_case_number": r["case_number"],
"proposed_canonical": canonical,
"court": r["court"],
"citation_formatted": r["cf"],
"extract_flag": flag,
"consistency": cons,
"desig_flag": desig_flag,
"will_change": "yes" if changes else "no",
})
canon_counts = Counter(d["proposed_canonical"] for d in out if d["proposed_canonical"])
for d in out:
d["dup_check"] = "DUP_CHECK" if (d["proposed_canonical"] and canon_counts[d["proposed_canonical"]] > 1) else ""
return out
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# BLOCKING flags forbid auto-apply (identity is uncertain/corrupt — chair must adjudicate).
# ADVISORY flags are surfaced for review but do NOT block the case_number fix, because the
# docket extraction is still deterministic and unambiguous (e.g. NO_CITATION = the display
# citation is missing, an orthogonal backfill gap — it does not make the docket wrong).
def _is_blocking(r: dict) -> bool:
return bool(r["extract_flag"] != "OK" or r["consistency"] in {"MISMATCH", "CIT_NO_DOCKET"}
or r["desig_flag"] or r["dup_check"])
def _is_flagged(r: dict) -> bool:
"""Anything worth showing the chair (blocking + advisory NO_CITATION)."""
return _is_blocking(r) or r["consistency"] == "NO_CITATION"
def _write_table(rows: list[dict], ts: str) -> tuple[Path, Path]:
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
csv_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.csv"
md_path = AUDIT_DIR / f"fu2c-reconciliation-{ts}.md"
cols = ["id", "source_kind", "current_case_number", "proposed_canonical", "court",
"citation_formatted", "extract_flag", "consistency", "desig_flag", "dup_check", "will_change"]
with csv_path.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
w.writerows(rows)
changing = [r for r in rows if r["will_change"] == "yes"]
flagged = [r for r in rows if _is_flagged(r)]
with md_path.open("w", encoding="utf-8") as f:
f.write(f"# FU-2c — טבלת-תיאום מזהים חיצוניים (case_law non-internal) — {ts}\n\n")
f.write(f"- סה\"כ רשומות: {len(rows)}\n- ישתנו: {len(changing)}\n- מסומנות לסקירה: {len(flagged)}\n\n")
f.write("## דורש הכרעת-יו\"ר (flags)\n\n")
f.write("BLOCK = חוסם auto-apply (זהות לא-ודאית); ADVISORY = תיקון case_number בטוח, פער נלווה.\n\n")
f.write("| current_case_number | proposed_canonical | flags | gate |\n|---|---|---|---|\n")
for r in flagged:
fl = " ".join(x for x in [
r["extract_flag"] if r["extract_flag"] != "OK" else "",
r["consistency"] if r["consistency"] in {"MISMATCH", "NO_CITATION", "CIT_NO_DOCKET"} else "",
r["desig_flag"], r["dup_check"]] if x)
gate = "BLOCK" if _is_blocking(r) else "ADVISORY"
f.write(f"| {r['current_case_number'][:55]} | {r['proposed_canonical']} | {fl} | {gate} |\n")
f.write("\n## שינויים שיוחלו ב-apply (will_change=yes, לא-חוסם — כולל ADVISORY)\n\n")
f.write("| current_case_number | → proposed_canonical |\n|---|---|\n")
for r in changing:
if _is_blocking(r):
continue
f.write(f"| {r['current_case_number'][:60]} | {r['proposed_canonical']} |\n")
return csv_path, md_path
def _load_overrides(overrides_csv: Path | None) -> dict[str, dict]:
"""Chair per-record adjudication of BLOCKING rows. id → {canonical, citation, reason}.
Columns: id, proposed_canonical, reason (required); citation_formatted (optional —
when present, the record's display citation is reconciled too). Each row is an
explicit, audited chair decision that unblocks one record (e.g. a consolidated
judgment whose lead docket the deterministic extractor cannot choose on its own)."""
if overrides_csv is None:
return {}
out: dict[str, dict] = {}
with overrides_csv.open(encoding="utf-8") as f:
for r in csv.DictReader(f):
cid, canon = r.get("id"), (r.get("proposed_canonical") or "").strip()
if cid and canon:
out[cid] = {
"canonical": canon,
"citation": (r.get("citation_formatted") or "").strip(),
"reason": (r.get("reason") or "").strip(),
}
return out
async def _apply(approved_csv: Path, overrides_csv: Path | None, ts: str) -> dict:
from legal_mcp.services import db
overrides = _load_overrides(overrides_csv)
with approved_csv.open(encoding="utf-8") as f:
all_rows = [r for r in csv.DictReader(f) if r.get("will_change") == "yes"]
# Decide the target per row. SCOPE: apply only to source_kind='external_upload'
# (the reviewed FU-2c target, task #68) OR an explicit chair override. cited_only /
# nevo_seed stay in the reconciliation VIEW (so DUP_CHECK spans the full external
# unique space) but are NOT migrated here — they are a separate, unreviewed category.
# Per row: non-blocking → proposed_canonical (NO_CITATION is advisory, still safe);
# blocking → only via override. An override may also carry citation_formatted.
plan: list[dict] = [] # {id, canonical, citation|None, source}
skipped_blocking: list[str] = []
skipped_out_of_scope = 0
for r in all_rows:
in_scope = r.get("source_kind") == "external_upload" or r["id"] in overrides
if not in_scope:
skipped_out_of_scope += 1
continue
blocking = _is_blocking({
"extract_flag": r.get("extract_flag", "OK"), "consistency": r.get("consistency", ""),
"desig_flag": r.get("desig_flag", ""), "dup_check": r.get("dup_check", ""),
})
if r["id"] in overrides:
ov = overrides[r["id"]]
plan.append({"id": r["id"], "canonical": ov["canonical"],
"citation": ov["citation"] or None, "source": f"override:{ov['reason']}"})
elif not blocking and r.get("proposed_canonical"):
plan.append({"id": r["id"], "canonical": r["proposed_canonical"], "citation": None, "source": "auto"})
elif blocking:
skipped_blocking.append(r["id"])
if not plan:
return {"applied": 0, "note": "no applicable rows", "skipped_blocking": skipped_blocking,
"skipped_out_of_scope": skipped_out_of_scope}
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
backup = AUDIT_DIR / f"fu2c-backup-{ts}.csv"
pool = await db.get_pool()
applied = 0
cit_applied = 0
collisions: list[str] = []
with backup.open("w", newline="", encoding="utf-8") as bf:
bw = csv.writer(bf)
bw.writerow(["id", "old_case_number", "new_case_number", "old_citation", "new_citation", "source"])
async with pool.acquire() as conn:
for p in plan:
rec = await conn.fetchrow(
"SELECT case_number, coalesce(citation_formatted,'') AS cf FROM case_law WHERE id=$1", p["id"])
if rec is None:
continue
# Pre-flight collision guard: the external unique index spans ALL
# source_kind<>'internal_committee'. Skip if another row already holds
# the target value, rather than letting the UPDATE raise (e.g. a cited_only
# reference that pre-existed the uploaded precedent → needs dedup, not migrate).
if p["canonical"] != rec["case_number"]:
clash = await conn.fetchval(
"SELECT id FROM case_law WHERE case_number=$1 "
"AND source_kind <> 'internal_committee' AND id <> $2", p["canonical"], p["id"])
if clash is not None:
collisions.append(f"{p['id']}{p['canonical']} (clash {clash})")
continue
new_cit = p["citation"] if p["citation"] is not None else rec["cf"]
bw.writerow([p["id"], rec["case_number"], p["canonical"], rec["cf"], new_cit, p["source"]])
if p["citation"] is not None:
await conn.execute(
"UPDATE case_law SET case_number=$2, citation_formatted=$3 WHERE id=$1 "
"AND source_kind <> 'internal_committee'",
p["id"], p["canonical"], p["citation"])
cit_applied += 1
else:
await conn.execute(
"UPDATE case_law SET case_number=$2 WHERE id=$1 "
"AND source_kind <> 'internal_committee'",
p["id"], p["canonical"])
applied += 1
return {"applied": applied, "citations_fixed": cit_applied, "overrides": len(overrides),
"skipped_blocking": skipped_blocking, "skipped_out_of_scope": skipped_out_of_scope,
"collisions": collisions, "backup": str(backup)}
async def main() -> int:
parser = argparse.ArgumentParser(description="FU-2c external case_number reconciliation")
parser.add_argument("--apply", action="store_true", help="apply approved changes (default: dry-run)")
parser.add_argument("--approved", type=str, help="path to chair-approved CSV (required with --apply)")
parser.add_argument("--overrides", type=str, help="optional CSV (id,proposed_canonical,reason) of "
"chair-adjudicated BLOCKING rows to unblock (e.g. consolidated-judgment lead docket)")
args = parser.parse_args()
ts = _ts()
if not args.apply:
rows = await _build_reconciliation()
csv_path, md_path = _write_table(rows, ts)
changing = sum(1 for r in rows if r["will_change"] == "yes")
flagged = sum(1 for r in rows if _is_flagged(r))
print(f"DRY-RUN: {len(rows)} rows | will_change={changing} | flagged={flagged}")
print(f" table: {md_path}")
print(f" csv: {csv_path}")
print("Review the table with the chair, then run --apply --approved <reviewed.csv>.")
return 0
if not args.approved:
print("ERROR: --apply requires --approved <csv> (the chair-reviewed table).", file=sys.stderr)
return 2
result = await _apply(Path(args.approved), Path(args.overrides) if args.overrides else None, ts)
print(f"APPLIED: {result}")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))