Files
legal-ai/scripts/backfill_canonical_halachot.py
Chaim 0c20f2054b
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 4s
Lint — undefined names / undefined-names (pull_request) Successful in 10s
feat(halachot): canonical principles model — V41 schema + backfill (Phase 1+2)
Introduces canonical_halachot table: one row per unique legal principle,
replacing the equivalent_halachot bidirectional-link model (V28/G2 improvement).
Per-precedent halachot rows become instances that point to their canonical.

Schema (V41):
- canonical_halachot: canonical_statement, rule_type, practice_areas,
  subject_tags, embedding (ivfflat), review_status (pending_synthesis→published),
  first_established_in FK → case_law, instance_count.
- halachot: +canonical_id FK, +instance_type (original|citation|application),
  +treatment; rule_statement + embedding become nullable for citation instances.
- halacha_citation_corroboration: +canonical_id FK so X11 aggregates at
  principle level, not instance level. store_corroboration auto-populates it
  via INSERT...SELECT.

New DB functions: create_canonical_halacha, nearest_canonical_halacha
(threshold search for Phase 3 lookup-before-insert), refresh_canonical_instance_count,
get_canonical_halacha (principle + instance list).

Backfill: scripts/backfill_canonical_halachot.py — dry-run by default,
--apply to execute. Uses union-find over equivalent_halachot pairs, picks
canonical representative (corroboration→confidence→earliest), creates canonicals,
sets canonical_id + instance_type on all instances.

Invariants: G2 (equivalent_halachot deprecated post-backfill), INV-G10
(canonical review_status gate), INV-DM7 (authority derived, not stored),
INV-AH (canonical_statement grounded in source statements, pending_synthesis
until chair approves).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 17:25:08 +00:00

237 lines
10 KiB
Python

#!/usr/bin/env python3
"""Backfill canonical_halachot table from existing halachot + equivalent_halachot.
WHAT THIS DOES
--------------
1. Finds connected components in equivalent_halachot (transitive closure).
2. For each cluster of ≥2 equivalent halachot: picks a canonical representative
(highest approved corroboration, then highest confidence, then earliest created),
creates ONE canonical_halachot row, and sets canonical_id on all cluster members.
3. For singleton halachot (not in any cluster): creates a 1:1 canonical.
4. Updates halacha_citation_corroboration.canonical_id from halachot.canonical_id.
5. Refreshes canonical_halachot.instance_count.
6. Marks cluster non-representative instances as instance_type='citation'.
The backfill sets canonical_statement = representative's rule_statement. A
subsequent LLM synthesis pass (backfill_canonical_synthesis.py, Phase 4) will
replace this with a broader synthesized statement and set review_status='pending_review'.
Until then, review_status stays 'pending_synthesis'.
IDEMPOTENCY
-----------
Halachot with canonical_id already set are skipped. Re-running only fills gaps.
USAGE
-----
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/backfill_canonical_halachot.py # dry-run
.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply # execute
.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply --verbose
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from collections import defaultdict
from datetime import datetime, timezone
from uuid import UUID
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db # noqa: E402
# ── connected-components helpers ──────────────────────────────────────
def _build_components(pairs: list[tuple[UUID, UUID]]) -> list[set[UUID]]:
"""Union-find over (a, b) pairs → list of connected-component sets."""
parent: dict[UUID, UUID] = {}
def find(x: UUID) -> UUID:
while parent.get(x, x) != x:
parent[x] = parent.get(parent.get(x, x), parent.get(x, x))
x = parent.get(x, x)
return x
def union(a: UUID, b: UUID) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
for a, b in pairs:
union(a, b)
groups: dict[UUID, set[UUID]] = defaultdict(set)
all_nodes = {n for pair in pairs for n in pair}
for node in all_nodes:
groups[find(node)].add(node)
return list(groups.values())
# ── main ──────────────────────────────────────────────────────────────
async def _run(apply: bool, verbose: bool) -> None:
pool = await db.get_pool()
async with pool.acquire() as conn:
# ── 1. Load equivalent_halachot pairs ────────────────────────
pair_rows = await conn.fetch(
"SELECT halacha_a, halacha_b FROM equivalent_halachot"
)
pairs: list[tuple[UUID, UUID]] = [(r["halacha_a"], r["halacha_b"]) for r in pair_rows]
components = _build_components(pairs)
clustered_ids: set[UUID] = {h for c in components for h in c}
print(f"equivalent_halachot pairs: {len(pairs)}")
print(f"connected components (clusters ≥2): {len(components)}")
# ── 2. Load all halachot that still need canonical_id ─────────
all_rows = await conn.fetch(
"SELECT h.id, h.rule_statement, h.rule_type, h.practice_areas, "
" h.subject_tags, h.embedding, h.case_law_id, h.confidence, "
" h.review_status, h.created_at, "
" COALESCE(cor.pos, 0) AS corroboration_count "
"FROM halachot h "
"LEFT JOIN ("
" SELECT halacha_id, COUNT(DISTINCT source_citation_id) FILTER "
" (WHERE treatment IN ('followed','explained')) AS pos "
" FROM halacha_citation_corroboration GROUP BY halacha_id"
") cor ON cor.halacha_id = h.id "
"WHERE h.canonical_id IS NULL"
)
pending = {r["id"]: dict(r) for r in all_rows}
print(f"halachot without canonical_id: {len(pending)}")
if not pending:
print("✅ nothing to backfill — all halachot already have canonical_id.")
return
# ── 3. Process clusters ───────────────────────────────────────
def _pick_canonical(members: list[dict]) -> dict:
"""Best representative: highest corroboration → highest confidence → earliest."""
return max(members, key=lambda r: (
1 if r["review_status"] in ("approved", "published") else 0,
r["corroboration_count"],
float(r["confidence"] or 0),
-r["created_at"].timestamp(),
))
canonical_created = 0
halacha_updated = 0
for component in components:
members = [pending[h] for h in component if h in pending]
if not members:
continue # cluster fully backfilled already
rep = _pick_canonical(members)
if verbose:
print(f"\n cluster({len(members)}) rep={rep['id']} "
f"corr={rep['corroboration_count']} "
f"status={rep['review_status']}")
if apply:
canonical_id = await conn.fetchval(
"INSERT INTO canonical_halachot "
"(canonical_statement, rule_type, practice_areas, subject_tags, "
" embedding, first_established_in, review_status, instance_count) "
"VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',$7) RETURNING id",
rep["rule_statement"] or "",
rep["rule_type"] or "interpretive",
rep["practice_areas"] or [],
rep["subject_tags"] or [],
rep["embedding"],
rep["case_law_id"],
len(members),
)
canonical_created += 1
for m in members:
itype = "original" if m["id"] == rep["id"] else "citation"
await conn.execute(
"UPDATE halachot SET canonical_id=$1, instance_type=$2, "
"updated_at=now() WHERE id=$3",
canonical_id, itype, m["id"],
)
halacha_updated += 1
# ── 4. Process singletons (no equivalent links) ───────────────
singletons = [r for r in pending.values() if r["id"] not in clustered_ids]
print(f"\nsingletons (no equivalent links): {len(singletons)}")
for r in singletons:
if verbose:
print(f" singleton: {r['id']}")
if apply:
canonical_id = await conn.fetchval(
"INSERT INTO canonical_halachot "
"(canonical_statement, rule_type, practice_areas, subject_tags, "
" embedding, first_established_in, review_status, instance_count) "
"VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',1) RETURNING id",
r["rule_statement"] or "",
r["rule_type"] or "interpretive",
r["practice_areas"] or [],
r["subject_tags"] or [],
r["embedding"],
r["case_law_id"],
)
canonical_created += 1
await conn.execute(
"UPDATE halachot SET canonical_id=$1, instance_type='original', "
"updated_at=now() WHERE id=$2",
canonical_id, r["id"],
)
halacha_updated += 1
# ── 5. Backfill halacha_citation_corroboration.canonical_id ───
if apply:
result = await conn.execute(
"UPDATE halacha_citation_corroboration hcc "
"SET canonical_id = h.canonical_id "
"FROM halachot h "
"WHERE hcc.halacha_id = h.id "
" AND hcc.canonical_id IS NULL "
" AND h.canonical_id IS NOT NULL"
)
corr_updated = int(result.split()[-1])
print(f"\ncorroboration rows backfilled: {corr_updated}")
# ── 6. Summary ────────────────────────────────────────────────
if apply:
remaining = await conn.fetchval(
"SELECT COUNT(*) FROM halachot WHERE canonical_id IS NULL"
)
canonical_total = await conn.fetchval(
"SELECT COUNT(*) FROM canonical_halachot"
)
print(f"\n✅ backfill complete")
print(f" canonical_halachot rows: {canonical_total}")
print(f" halachot updated: {halacha_updated}")
print(f" halachot still without canonical_id: {remaining}")
else:
cluster_halachot = sum(
len([m for m in c if m in pending]) for c in components
)
print(f"\n[dry-run] would create:")
print(f" canonical_halachot for {len(components)} clusters "
f"({cluster_halachot} halachot) + {len(singletons)} singletons")
print(f" = ~{len(components) + len(singletons)} canonical principles "
f"from {len(pending)} halachot instances")
print(" Run with --apply to execute.")
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--apply", action="store_true",
help="Actually write to DB (default: dry-run, prints only)")
ap.add_argument("--verbose", "-v", action="store_true",
help="Print each cluster/singleton as it is processed")
args = ap.parse_args()
asyncio.run(_run(apply=args.apply, verbose=args.verbose))
if __name__ == "__main__":
main()