#!/usr/bin/env python3 """Backfill canonical_halachot table from existing halachot + equivalent_halachot. WHAT THIS DOES -------------- 1. Finds connected components in equivalent_halachot (transitive closure). 2. For each cluster of ≥2 equivalent halachot: picks a canonical representative (highest approved corroboration, then highest confidence, then earliest created), creates ONE canonical_halachot row, and sets canonical_id on all cluster members. 3. For singleton halachot (not in any cluster): creates a 1:1 canonical. 4. Updates halacha_citation_corroboration.canonical_id from halachot.canonical_id. 5. Refreshes canonical_halachot.instance_count. 6. Marks cluster non-representative instances as instance_type='citation'. The backfill sets canonical_statement = representative's rule_statement. A subsequent LLM synthesis pass (backfill_canonical_synthesis.py, Phase 4) will replace this with a broader synthesized statement and set review_status='pending_review'. Until then, review_status stays 'pending_synthesis'. IDEMPOTENCY ----------- Halachot with canonical_id already set are skipped. Re-running only fills gaps. USAGE ----- cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/backfill_canonical_halachot.py # dry-run .venv/bin/python ../scripts/backfill_canonical_halachot.py --apply # execute .venv/bin/python ../scripts/backfill_canonical_halachot.py --apply --verbose """ from __future__ import annotations import argparse import asyncio import os import sys from collections import defaultdict from datetime import datetime, timezone from uuid import UUID sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import db # noqa: E402 # ── connected-components helpers ────────────────────────────────────── def _build_components(pairs: list[tuple[UUID, UUID]]) -> list[set[UUID]]: """Union-find over (a, b) pairs → list of connected-component sets.""" parent: dict[UUID, UUID] = {} def find(x: UUID) -> UUID: while parent.get(x, x) != x: parent[x] = parent.get(parent.get(x, x), parent.get(x, x)) x = parent.get(x, x) return x def union(a: UUID, b: UUID) -> None: ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra for a, b in pairs: union(a, b) groups: dict[UUID, set[UUID]] = defaultdict(set) all_nodes = {n for pair in pairs for n in pair} for node in all_nodes: groups[find(node)].add(node) return list(groups.values()) # ── main ────────────────────────────────────────────────────────────── async def _run(apply: bool, verbose: bool) -> None: pool = await db.get_pool() async with pool.acquire() as conn: # ── 1. Load equivalent_halachot pairs ──────────────────────── pair_rows = await conn.fetch( "SELECT halacha_a, halacha_b FROM equivalent_halachot" ) pairs: list[tuple[UUID, UUID]] = [(r["halacha_a"], r["halacha_b"]) for r in pair_rows] components = _build_components(pairs) clustered_ids: set[UUID] = {h for c in components for h in c} print(f"equivalent_halachot pairs: {len(pairs)}") print(f"connected components (clusters ≥2): {len(components)}") # ── 2. Load all halachot that still need canonical_id ───────── all_rows = await conn.fetch( "SELECT h.id, h.rule_statement, h.rule_type, h.practice_areas, " " h.subject_tags, h.embedding, h.case_law_id, h.confidence, " " h.review_status, h.created_at, " " COALESCE(cor.pos, 0) AS corroboration_count " "FROM halachot h " "LEFT JOIN (" " SELECT halacha_id, COUNT(DISTINCT source_citation_id) FILTER " " (WHERE treatment IN ('followed','explained')) AS pos " " FROM halacha_citation_corroboration GROUP BY halacha_id" ") cor ON cor.halacha_id = h.id " "WHERE h.canonical_id IS NULL" ) pending = {r["id"]: dict(r) for r in all_rows} print(f"halachot without canonical_id: {len(pending)}") if not pending: print("✅ nothing to backfill — all halachot already have canonical_id.") return # ── 3. Process clusters ─────────────────────────────────────── def _pick_canonical(members: list[dict]) -> dict: """Best representative: highest corroboration → highest confidence → earliest.""" return max(members, key=lambda r: ( 1 if r["review_status"] in ("approved", "published") else 0, r["corroboration_count"], float(r["confidence"] or 0), -r["created_at"].timestamp(), )) canonical_created = 0 halacha_updated = 0 for component in components: members = [pending[h] for h in component if h in pending] if not members: continue # cluster fully backfilled already rep = _pick_canonical(members) if verbose: print(f"\n cluster({len(members)}) rep={rep['id']} " f"corr={rep['corroboration_count']} " f"status={rep['review_status']}") if apply: canonical_id = await conn.fetchval( "INSERT INTO canonical_halachot " "(canonical_statement, rule_type, practice_areas, subject_tags, " " embedding, first_established_in, review_status, instance_count) " "VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',$7) RETURNING id", rep["rule_statement"] or "", rep["rule_type"] or "interpretive", rep["practice_areas"] or [], rep["subject_tags"] or [], rep["embedding"], rep["case_law_id"], len(members), ) canonical_created += 1 for m in members: itype = "original" if m["id"] == rep["id"] else "citation" await conn.execute( "UPDATE halachot SET canonical_id=$1, instance_type=$2, " "updated_at=now() WHERE id=$3", canonical_id, itype, m["id"], ) halacha_updated += 1 # ── 4. Process singletons (no equivalent links) ─────────────── singletons = [r for r in pending.values() if r["id"] not in clustered_ids] print(f"\nsingletons (no equivalent links): {len(singletons)}") for r in singletons: if verbose: print(f" singleton: {r['id']}") if apply: canonical_id = await conn.fetchval( "INSERT INTO canonical_halachot " "(canonical_statement, rule_type, practice_areas, subject_tags, " " embedding, first_established_in, review_status, instance_count) " "VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',1) RETURNING id", r["rule_statement"] or "", r["rule_type"] or "interpretive", r["practice_areas"] or [], r["subject_tags"] or [], r["embedding"], r["case_law_id"], ) canonical_created += 1 await conn.execute( "UPDATE halachot SET canonical_id=$1, instance_type='original', " "updated_at=now() WHERE id=$2", canonical_id, r["id"], ) halacha_updated += 1 # ── 5. Backfill halacha_citation_corroboration.canonical_id ─── if apply: result = await conn.execute( "UPDATE halacha_citation_corroboration hcc " "SET canonical_id = h.canonical_id " "FROM halachot h " "WHERE hcc.halacha_id = h.id " " AND hcc.canonical_id IS NULL " " AND h.canonical_id IS NOT NULL" ) corr_updated = int(result.split()[-1]) print(f"\ncorroboration rows backfilled: {corr_updated}") # ── 6. Summary ──────────────────────────────────────────────── if apply: remaining = await conn.fetchval( "SELECT COUNT(*) FROM halachot WHERE canonical_id IS NULL" ) canonical_total = await conn.fetchval( "SELECT COUNT(*) FROM canonical_halachot" ) print(f"\n✅ backfill complete") print(f" canonical_halachot rows: {canonical_total}") print(f" halachot updated: {halacha_updated}") print(f" halachot still without canonical_id: {remaining}") else: cluster_halachot = sum( len([m for m in c if m in pending]) for c in components ) print(f"\n[dry-run] would create:") print(f" canonical_halachot for {len(components)} clusters " f"({cluster_halachot} halachot) + {len(singletons)} singletons") print(f" = ~{len(components) + len(singletons)} canonical principles " f"from {len(pending)} halachot instances") print(" Run with --apply to execute.") def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--apply", action="store_true", help="Actually write to DB (default: dry-run, prints only)") ap.add_argument("--verbose", "-v", action="store_true", help="Print each cluster/singleton as it is processed") args = ap.parse_args() asyncio.run(_run(apply=args.apply, verbose=args.verbose)) if __name__ == "__main__": main()