feat(halachot): canonical principles model — V41 schema + backfill (Phase 1+2) #298

Merged
chaim merged 1 commits from worktree-canonical-halachot into main 2026-06-17 17:27:53 +00:00
3 changed files with 430 additions and 5 deletions

View File

@@ -1585,6 +1585,90 @@ CREATE INDEX IF NOT EXISTS idx_missing_precedents_citation_norm
"""
# ── V41: canonical_halachot ──────────────────────────────────────────
# Replaces the equivalent_halachot bidirectional-link model (V28) with a
# first-class canonical entity. Instead of recording that halacha A ≡ halacha B,
# we now have ONE canonical_halachot row that BOTH A and B point to.
#
# Each legal PRINCIPLE is defined ONCE here (canonical_statement = LLM-
# synthesized abstraction, grounded in source statements per INV-AH). The
# per-precedent halachot rows become INSTANCES that link to the canonical and
# carry only their own quote, treatment, and context.
#
# Extraction pipeline change (Phase 3, separate PR): lookup-before-insert —
# embed new extraction, cosine-search canonical_halachot (≥0.85); if match,
# store a thin 'citation' instance; if not, create new canonical + 'original'
# instance. This eliminates per-extraction duplication of the same principle.
#
# INV-DM7: authority (binding/persuasive) derived from
# first_established_in.precedent_level — never stored on canonical.
# INV-G10: only 'published' canonicals reach drafting agents.
# INV-AH: canonical_statement grounded in source statements, never invented;
# review_status='pending_synthesis' until chair verifies.
# G2: equivalent_halachot (V28) deprecated post-backfill (no parallel path).
SCHEMA_V41_SQL = """
-- One row per unique legal principle across all precedents.
CREATE TABLE IF NOT EXISTS canonical_halachot (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
canonical_statement TEXT NOT NULL,
rule_type TEXT NOT NULL DEFAULT 'interpretive',
practice_areas TEXT[] NOT NULL DEFAULT '{}',
subject_tags TEXT[] NOT NULL DEFAULT '{}',
embedding vector(1024),
review_status TEXT NOT NULL DEFAULT 'pending_synthesis'
CHECK (review_status IN
('pending_synthesis','pending_review','approved','published','rejected')),
first_established_in UUID REFERENCES case_law(id) ON DELETE SET NULL,
instance_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_canonical_halachot_status
ON canonical_halachot(review_status);
CREATE INDEX IF NOT EXISTS idx_canonical_halachot_practice
ON canonical_halachot USING gin(practice_areas);
CREATE INDEX IF NOT EXISTS idx_canonical_halachot_tags
ON canonical_halachot USING gin(subject_tags);
CREATE INDEX IF NOT EXISTS idx_canonical_halachot_vec
ON canonical_halachot USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 30);
-- halachot: canonical linkage + role columns.
-- canonical_id: NULL until backfill_canonical_halachot.py runs; 100% filled after.
-- instance_type: role of this precedent's mention of the principle.
-- 'original' = the precedent that FIRST established the principle (source)
-- 'citation' = a later precedent that cites/applies the principle
-- 'application'= a later precedent that applies the principle to new facts
-- treatment: how this precedent's mention relates to the canonical principle.
-- Parallels halacha_citation_corroboration.treatment (X11) but for precedents
-- (X11 tracks citations from internal decisions; this tracks per-precedent treatment).
-- rule_statement + embedding become nullable: citation instances inherit these
-- from canonical_halachot. 'original' instances keep their own stored values.
ALTER TABLE halachot
ADD COLUMN IF NOT EXISTS canonical_id UUID
REFERENCES canonical_halachot(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS instance_type TEXT NOT NULL DEFAULT 'original'
CHECK (instance_type IN ('original','citation','application')),
ADD COLUMN IF NOT EXISTS treatment TEXT NOT NULL DEFAULT 'mentioned';
ALTER TABLE halachot ALTER COLUMN rule_statement DROP NOT NULL;
ALTER TABLE halachot ALTER COLUMN embedding DROP NOT NULL;
CREATE INDEX IF NOT EXISTS idx_halachot_canonical ON halachot(canonical_id);
CREATE INDEX IF NOT EXISTS idx_halachot_instance_type ON halachot(instance_type);
-- halacha_citation_corroboration (X11) gains canonical_id so the signal
-- aggregates at the principle level rather than the per-instance level.
-- Backfill: UPDATE halacha_citation_corroboration SET canonical_id =
-- (SELECT canonical_id FROM halachot WHERE id = halacha_id).
-- halacha_id is retained for audit trail.
ALTER TABLE halacha_citation_corroboration
ADD COLUMN IF NOT EXISTS canonical_id UUID
REFERENCES canonical_halachot(id) ON DELETE CASCADE;
CREATE INDEX IF NOT EXISTS idx_hcc_canonical
ON halacha_citation_corroboration(canonical_id)
WHERE canonical_id IS NOT NULL;
"""
# Stable, arbitrary key for the session-level advisory lock that serialises
# schema DDL across processes. Every short-lived process (cron drains, services)
# re-runs the idempotent migrations on startup; without this lock two processes
@@ -1602,7 +1686,7 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await _apply_schema_ddl(conn)
finally:
await conn.execute("SELECT pg_advisory_unlock($1)", _MIGRATION_LOCK_KEY)
logger.info("Database schema initialized (v1-v40)")
logger.info("Database schema initialized (v1-v41)")
async def _apply_schema_ddl(conn: asyncpg.Connection) -> None:
@@ -1647,6 +1731,7 @@ async def _apply_schema_ddl(conn: asyncpg.Connection) -> None:
await conn.execute(SCHEMA_V38_SQL)
await conn.execute(SCHEMA_V39_SQL)
await conn.execute(SCHEMA_V40_SQL)
await conn.execute(SCHEMA_V41_SQL)
async def init_schema() -> None:
@@ -5764,12 +5849,20 @@ async def store_corroboration(
s_id = _UUID(source_id) if isinstance(source_id, str) else source_id
cl_id = _UUID(citing_case_law_id) if (citing_case_law_id and isinstance(citing_case_law_id, str)) else citing_case_law_id
d_id = _UUID(citing_decision_id) if (citing_decision_id and isinstance(citing_decision_id, str)) else citing_decision_id
# INSERT ... SELECT so we can pull canonical_id from halachot in one round-trip.
# canonical_id is NULL until backfill_canonical_halachot.py runs; COALESCE keeps
# existing canonical_id on conflict so a pre-backfill row is upgraded when the
# same corroboration is re-stored post-backfill.
await pool.execute(
"INSERT INTO halacha_citation_corroboration "
"(halacha_id, citing_case_law_id, citing_decision_id, source_citation_id, treatment, match_score, match_context) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) "
"(halacha_id, canonical_id, citing_case_law_id, citing_decision_id, "
" source_citation_id, treatment, match_score, match_context) "
"SELECT $1, h.canonical_id, $2, $3, $4, $5, $6, $7 "
"FROM halachot h WHERE h.id = $1 "
"ON CONFLICT (halacha_id, source_citation_id) DO UPDATE SET "
"treatment=EXCLUDED.treatment, match_score=EXCLUDED.match_score",
"treatment=EXCLUDED.treatment, match_score=EXCLUDED.match_score, "
"canonical_id=COALESCE(EXCLUDED.canonical_id, "
" halacha_citation_corroboration.canonical_id)",
h_id, cl_id, d_id, s_id, treatment, score, context,
)
@@ -5846,6 +5939,101 @@ async def list_equivalent_for_halacha(halacha_id: UUID) -> list[dict]:
]
# ── Canonical halachot (V41) ─────────────────────────────────────────────────
async def create_canonical_halacha(
statement: str,
rule_type: str = "interpretive",
practice_areas: list[str] | None = None,
subject_tags: list[str] | None = None,
embedding: list[float] | None = None,
first_established_in: "UUID | None" = None,
review_status: str = "pending_synthesis",
) -> "UUID":
"""Insert a new canonical principle and return its id."""
pool = await get_pool()
row = await pool.fetchrow(
"INSERT INTO canonical_halachot "
"(canonical_statement, rule_type, practice_areas, subject_tags, "
" embedding, first_established_in, review_status) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING id",
statement,
rule_type,
practice_areas or [],
subject_tags or [],
embedding,
first_established_in,
review_status,
)
return row["id"]
async def nearest_canonical_halacha(
vec: list[float],
threshold: float = 0.85,
status_filter: tuple[str, ...] = ("approved", "published"),
) -> "tuple[str, float] | None":
"""Return (canonical_id, cosine_sim) of the nearest approved/published canonical
whose cosine similarity to `vec` meets `threshold`, or None if none qualifies.
Used by the extractor's lookup-before-insert (Phase 3) to detect whether
a newly extracted principle already exists in the registry.
"""
pool = await get_pool()
row = await pool.fetchrow(
"SELECT id::text AS id, 1 - (embedding <=> $1) AS sim "
"FROM canonical_halachot "
"WHERE embedding IS NOT NULL AND review_status = ANY($2::text[]) "
"ORDER BY embedding <=> $1 LIMIT 1",
vec, list(status_filter),
)
if not row:
return None
sim = float(row["sim"])
return (row["id"], sim) if sim >= threshold else None
async def refresh_canonical_instance_count(canonical_id: "UUID") -> None:
"""Recount halachot rows pointing to this canonical and update instance_count."""
pool = await get_pool()
await pool.execute(
"UPDATE canonical_halachot SET "
"instance_count = (SELECT COUNT(*) FROM halachot WHERE canonical_id = $1), "
"updated_at = now() "
"WHERE id = $1",
canonical_id,
)
async def get_canonical_halacha(canonical_id: "UUID") -> "dict | None":
"""Fetch one canonical principle with its instance list."""
pool = await get_pool()
row = await pool.fetchrow(
"SELECT ch.id::text, ch.canonical_statement, ch.rule_type, "
" ch.practice_areas, ch.subject_tags, ch.review_status, "
" ch.instance_count, ch.created_at, ch.updated_at, "
" cl.case_number AS first_established_case "
"FROM canonical_halachot ch "
"LEFT JOIN case_law cl ON cl.id = ch.first_established_in "
"WHERE ch.id = $1",
canonical_id,
)
if not row:
return None
instances = await pool.fetch(
"SELECT h.id::text, h.instance_type, h.treatment, h.supporting_quote, "
" h.page_reference, h.review_status AS instance_status, "
" cl.case_number, cl.case_name "
"FROM halachot h JOIN case_law cl ON cl.id = h.case_law_id "
"WHERE h.canonical_id = $1 ORDER BY h.instance_type, cl.case_number",
canonical_id,
)
return {
**dict(row),
"instances": [dict(i) for i in instances],
}
async def _annotate_equivalents(pool, out: list[dict]) -> None:
"""Attach an `equivalents` list to each row (#84.2) — parallel-authority links.

View File

@@ -64,7 +64,8 @@
| `halacha_panel_audit.py` | python | **רשת-ביטחון לפאנל** (selective-prediction monitoring) — דוגם הלכות שאושרו ע"י הפאנל (`reviewer LIKE 'panel:%'`), מריץ עליהן **שוב** את הצבעת-ה-KEEP של 3 השופטים, ומציף כל מקרה שכעת נוטה DROP (false-keep פוטנציאלי). report-only כברירת-מחדל; `--flag` מחזיר את ה-flips ל-`pending_review` לסקירת-יו"ר. `--sample N`/`--seed`. בסיס 2026-06-07: 0/15. מיועד להרצה תקופתית (שבועי). מייבא שופטים מ-`halacha_panel_approve`. **חובה מקומי**. | תקופתי (שבועי) — ניטור |
| `halacha_panel_calibrate.py` | python | **כיול + מדידת הפאנל** (Trust-or-Escalate, ICLR 2025). `--source live` (ברירת-מחדל): מריץ את שאלת-ה-KEEP על מדגם-הזהב ומודד מול `is_holding` precision+coverage+**split-rate** לכל מדיניות + false-keep/false-drop (מייבא שופטים מ-`halacha_panel_approve`, **חובה מקומי**). **#133/FU-5** — `--source captured`: **אפס-עלות** (בלי re-vote/LLM) — מצליב סבבים שמורים (FU-1) מול הכרעות-יו"ר (FU-2) דרך `db.panel_rounds_vs_chair` ומדווח split-rate+auto-precision **לכל סבב** (מגמת הלולאה: ככל שהרובריקה משתפרת precision נשמר ו-split יורד); משתף את `analyze_pairs` של FU-4 (מקור-יחיד). שתי המדידות מדווחות **anon-stability** (מבחן-אנונימיזציה #81.7) כמטריקת-בריאות נגד echo-chamber. `--batch`/`--limit`/`--concurrency`. | ידני — לפני חיווט `--apply` (live) / תקופתי — מעקב-לולאה (captured) |
| `halacha_rubric_distill.py` | python | **#133/FU-4 — זיקוק-רובריקה PROPOSE-ONLY.** מצליב `halacha_panel_rounds` (FU-1, הצבעות+נימוקים) מול הכרעות-היו"ר (FU-2, seeds ב-`halacha_goldset` batch `chair-live`) דרך `db.panel_rounds_vs_chair` (read-only), מנתח דטרמיניסטית **כשלים שיטתיים** (false-keep/false-drop, פיצולים-שהוכרעו, שיעור-מחלוקת-עם-היו"ר לכל שופט), ומציע `KEEP_SYSTEM` v2 + exemplars מופשטים (claude_session מקומי, אפס עלות) כ**דוח-diff** ל-`data/learning/rubric-proposal-<ts>.md`. **לעולם לא auto-apply** — אימוץ v2 = עריכה אנושית של הקבוע דרך PR (INV-LRN1); exemplars מופשטים בלבד (INV-LRN5); הסיגנל היחיד = הכרעת-יו"ר, לא הצבעות-פאנל (anti-echo). מתחת ל-12 זוגות → "אין מספיק נתונים". `--no-llm` (סטטיסטיקה בלבד) / `--limit N`. **חובה מקומי**. | תקופתי — אחרי שהצטברו הכרעות-יו"ר על מחלוקות-פאנל |
| `halacha_batch_reconcile.py` | python | **#82.7** — dedup חוצה-פסקים offline (שמרני, **dry-run בלבד**). dedup-on-insert משווה רק תוך-פסק; כאן סף מחמיר (cosine ≥0.95, `--cosine`) ולא-הרסני: מאתר זוגות הלכות near-duplicate בין פסקים שונים (pgvector `<=>` exact) עם איתות לקסיקלי (Jaccard/Levenshtein) ומדווח ל-CSV ב-`data/audit/` לסקירת היו"ר. לא מדלג/ממזג/מוחק. `--include-pending`. **`--link`** רושם את הזוגות שנמצאו כ-`equivalent_halachot` (parallel authority, #84.2 — קישור-מקביל ברמת-הלכה, **לא** ציטוט; idempotent, לא-הרסני). רץ עם venv של mcp-server. אומת: 800 הלכות → 5 זוגות (קושרו). | ידני — דוח-סקירה / `--link` לקישור |
| `backfill_canonical_halachot.py` | python | **V41 — הקמת מודל ההלכות הקנוניות (חד-פעמי + idempotent).** (1) בונה רכיבים-קשורים (connected components) מ-`equivalent_halachot` (transitive closure — union-find). (2) לכל אשכול: בוחר נציג-קנוני (הכי הרבה corroboration → confidence → earliest), יוצר שורת `canonical_halachot`, ומעדכן `canonical_id` + `instance_type` לכל חברי האשכול. (3) לסינגלטונים (ללא קישורי-שוויון): 1:1 canonical. (4) מאכלס `halacha_citation_corroboration.canonical_id` מ-`halachot.canonical_id`. `--dry-run` (ברירת-מחדל, מחשב ומדווח בלבד) / `--apply` (כותב) / `--verbose`. לאחר הרצה: `canonical_statement` = ניסוח-נציג (pending_synthesis); עוקב: `backfill_canonical_synthesis.py` (Phase 4) יסנתז ניסוח-רחב דרך LLM. הרץ: `mcp-server/.venv/bin/python scripts/backfill_canonical_halachot.py --apply`. | **חד-פעמי** (לאחר deploy V41) / idempotent לפי צורך |
| `halacha_batch_reconcile.py` | python | **#82.7** — dedup חוצה-פסקים offline (שמרני, **dry-run בלבד**). dedup-on-insert משווה רק תוך-פסק; כאן סף מחמיר (cosine ≥0.95, `--cosine`) ולא-הרסני: מאתר זוגות הלכות near-duplicate בין פסקים שונים (pgvector `<=>` exact) עם איתות לקסיקלי (Jaccard/Levenshtein) ומדווח ל-CSV ב-`data/audit/` לסקירת היו"ר. לא מדלג/ממזג/מוחק. `--include-pending`. **`--link`** רושם את הזוגות שנמצאו כ-`equivalent_halachot` (parallel authority, #84.2 — **deprecated post-V41** — השתמש ב-`backfill_canonical_halachot.py --apply` במקום). רץ עם venv של mcp-server. | **deprecated** — הוחלף ב-`backfill_canonical_halachot.py` (V41). נשמר לצורכי audit |
| `calibrate_halacha_dedup.py` | python | **#82.1** — כיול ספי ה-dedup הלקסיקלי (#82.3) מול gold-set הניקוי. קורא `halacha-cleanup-manifest-*.csv` (זוגות duplicate↔survivor מתויגי-אדם), טוען טקסט-survivor מה-DB, ו-sweep של (jaccard_min × levenshtein_min) עם P/R/F1, מסמן את נקודת-העבודה המוגדרת. אימת ש-(0.55, 0.70) → **precision 1.0** (אפס false-merge), recall 0.30 — מתאים לאיתות-משני שחוסם auto-approve. `--manifest <path>`. רץ עם venv של mcp-server | חד-פעמי — כיול (בוצע 2026-06-06) |
| `ab_halacha_opus48.py` | python | **A/B לא-הרסני לחילוץ הלכות (Claude)** — מריץ מחדש חילוץ הלכות על פסק-דין בודד דרך מודל/effort נבחרים (`AB_MODEL`/`AB_EFFORT`, ברירת-מחדל `claude-opus-4-8`/`xhigh`) ומשווה לסטטיסטיקות ההלכות הקיימות ב-DB **בלי למחוק/לכתוב כלום**. משכפל את `halacha_extractor.extract()` (אותם פרומפטים, בחירת-צ'אנקים, אימות-ציטוט) ומחליף רק את קריאת ה-LLM ב-`claude -p --model --effort`. מפיק `data/ab_halacha_<case>_<effort>.json`. הרצה: `DOTENV_PATH=/home/chaim/.env DATA_DIR=.../data .venv/bin/python scripts/ab_halacha_opus48.py <case_law_id>`. **ממצא 2026-05-31 (שטיין 1128-08-20):** Opus 4.8@xhigh חילץ 51 מול 124 בייצור (100% quote-verified מול 96%) אך ביטחון מכויל-נמוך יותר (חציון 0.75 מול 0.82) — ולכן **לא** מקטין את תור-האישור-הידני תחת sweep אוטו-אישור conf≥0.78 (26 מול 24). שיפור איכות, לא צמצום-תור. | ידני (החלטת מודל-חילוץ) |
| `ab_halacha_codex.py` | python | **A/B לא-הרסני לחילוץ הלכות (Codex/gpt-5.5)** — עמית ל-`ab_halacha_opus48` אך מחליף את `claude -p` ב-`codex exec --model gpt-5.5` (אימות ChatGPT, ללא OPENAI_API_KEY). אותם פרומפטים ואותו הסקת quote-verification. הפלט האחרון של הסוכן (`-o FILE`) נפענח כ-JSON. `AB_MODEL` (default `gpt-5.5`), `AB_REASONING` low/medium/high/xhigh (default `medium`), `AB_CONCURRENCY` (default 1), `CODEX_BIN`. מפיק `data/ab_halacha_codex_<case>_<model>_<reasoning>.json`. הרצה: `DOTENV_PATH=/home/chaim/.env DATA_DIR=.../data mcp-server/.venv/bin/python scripts/ab_halacha_codex.py <case_law_id>`. **ממצא 2026-06-17 (8181-21 האוניברסיטה העברית):** gpt-5.5@medium חילץ 27 מול 28 של Opus (quote-verified 100%/100%), ביטחון חציון 0.86 מול 0.78 — אך **0 פריטים מתחת ל-0.7** (לעומת 9/28 של Opus = 32%), דבר המצביע על over-confidence. holding↑ (12 מול 7), procedural↓ (4 מול 7). **מסקנה: ריאלי כ-fallback חירום; לא מוכן לייצור ללא כיול-ביטחון.** | ידני (בנצ'מרק מודל codex) |

View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""Backfill canonical_halachot table from existing halachot + equivalent_halachot.
WHAT THIS DOES
--------------
1. Finds connected components in equivalent_halachot (transitive closure).
2. For each cluster of ≥2 equivalent halachot: picks a canonical representative
(highest approved corroboration, then highest confidence, then earliest created),
creates ONE canonical_halachot row, and sets canonical_id on all cluster members.
3. For singleton halachot (not in any cluster): creates a 1:1 canonical.
4. Updates halacha_citation_corroboration.canonical_id from halachot.canonical_id.
5. Refreshes canonical_halachot.instance_count.
6. Marks cluster non-representative instances as instance_type='citation'.
The backfill sets canonical_statement = representative's rule_statement. A
subsequent LLM synthesis pass (backfill_canonical_synthesis.py, Phase 4) will
replace this with a broader synthesized statement and set review_status='pending_review'.
Until then, review_status stays 'pending_synthesis'.
IDEMPOTENCY
-----------
Halachot with canonical_id already set are skipped. Re-running only fills gaps.
USAGE
-----
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/backfill_canonical_halachot.py # dry-run
.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply # execute
.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply --verbose
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from collections import defaultdict
from datetime import datetime, timezone
from uuid import UUID
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db # noqa: E402
# ── connected-components helpers ──────────────────────────────────────
def _build_components(pairs: list[tuple[UUID, UUID]]) -> list[set[UUID]]:
"""Union-find over (a, b) pairs → list of connected-component sets."""
parent: dict[UUID, UUID] = {}
def find(x: UUID) -> UUID:
while parent.get(x, x) != x:
parent[x] = parent.get(parent.get(x, x), parent.get(x, x))
x = parent.get(x, x)
return x
def union(a: UUID, b: UUID) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
for a, b in pairs:
union(a, b)
groups: dict[UUID, set[UUID]] = defaultdict(set)
all_nodes = {n for pair in pairs for n in pair}
for node in all_nodes:
groups[find(node)].add(node)
return list(groups.values())
# ── main ──────────────────────────────────────────────────────────────
async def _run(apply: bool, verbose: bool) -> None:
pool = await db.get_pool()
async with pool.acquire() as conn:
# ── 1. Load equivalent_halachot pairs ────────────────────────
pair_rows = await conn.fetch(
"SELECT halacha_a, halacha_b FROM equivalent_halachot"
)
pairs: list[tuple[UUID, UUID]] = [(r["halacha_a"], r["halacha_b"]) for r in pair_rows]
components = _build_components(pairs)
clustered_ids: set[UUID] = {h for c in components for h in c}
print(f"equivalent_halachot pairs: {len(pairs)}")
print(f"connected components (clusters ≥2): {len(components)}")
# ── 2. Load all halachot that still need canonical_id ─────────
all_rows = await conn.fetch(
"SELECT h.id, h.rule_statement, h.rule_type, h.practice_areas, "
" h.subject_tags, h.embedding, h.case_law_id, h.confidence, "
" h.review_status, h.created_at, "
" COALESCE(cor.pos, 0) AS corroboration_count "
"FROM halachot h "
"LEFT JOIN ("
" SELECT halacha_id, COUNT(DISTINCT source_citation_id) FILTER "
" (WHERE treatment IN ('followed','explained')) AS pos "
" FROM halacha_citation_corroboration GROUP BY halacha_id"
") cor ON cor.halacha_id = h.id "
"WHERE h.canonical_id IS NULL"
)
pending = {r["id"]: dict(r) for r in all_rows}
print(f"halachot without canonical_id: {len(pending)}")
if not pending:
print("✅ nothing to backfill — all halachot already have canonical_id.")
return
# ── 3. Process clusters ───────────────────────────────────────
def _pick_canonical(members: list[dict]) -> dict:
"""Best representative: highest corroboration → highest confidence → earliest."""
return max(members, key=lambda r: (
1 if r["review_status"] in ("approved", "published") else 0,
r["corroboration_count"],
float(r["confidence"] or 0),
-r["created_at"].timestamp(),
))
canonical_created = 0
halacha_updated = 0
for component in components:
members = [pending[h] for h in component if h in pending]
if not members:
continue # cluster fully backfilled already
rep = _pick_canonical(members)
if verbose:
print(f"\n cluster({len(members)}) rep={rep['id']} "
f"corr={rep['corroboration_count']} "
f"status={rep['review_status']}")
if apply:
canonical_id = await conn.fetchval(
"INSERT INTO canonical_halachot "
"(canonical_statement, rule_type, practice_areas, subject_tags, "
" embedding, first_established_in, review_status, instance_count) "
"VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',$7) RETURNING id",
rep["rule_statement"] or "",
rep["rule_type"] or "interpretive",
rep["practice_areas"] or [],
rep["subject_tags"] or [],
rep["embedding"],
rep["case_law_id"],
len(members),
)
canonical_created += 1
for m in members:
itype = "original" if m["id"] == rep["id"] else "citation"
await conn.execute(
"UPDATE halachot SET canonical_id=$1, instance_type=$2, "
"updated_at=now() WHERE id=$3",
canonical_id, itype, m["id"],
)
halacha_updated += 1
# ── 4. Process singletons (no equivalent links) ───────────────
singletons = [r for r in pending.values() if r["id"] not in clustered_ids]
print(f"\nsingletons (no equivalent links): {len(singletons)}")
for r in singletons:
if verbose:
print(f" singleton: {r['id']}")
if apply:
canonical_id = await conn.fetchval(
"INSERT INTO canonical_halachot "
"(canonical_statement, rule_type, practice_areas, subject_tags, "
" embedding, first_established_in, review_status, instance_count) "
"VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',1) RETURNING id",
r["rule_statement"] or "",
r["rule_type"] or "interpretive",
r["practice_areas"] or [],
r["subject_tags"] or [],
r["embedding"],
r["case_law_id"],
)
canonical_created += 1
await conn.execute(
"UPDATE halachot SET canonical_id=$1, instance_type='original', "
"updated_at=now() WHERE id=$2",
canonical_id, r["id"],
)
halacha_updated += 1
# ── 5. Backfill halacha_citation_corroboration.canonical_id ───
if apply:
result = await conn.execute(
"UPDATE halacha_citation_corroboration hcc "
"SET canonical_id = h.canonical_id "
"FROM halachot h "
"WHERE hcc.halacha_id = h.id "
" AND hcc.canonical_id IS NULL "
" AND h.canonical_id IS NOT NULL"
)
corr_updated = int(result.split()[-1])
print(f"\ncorroboration rows backfilled: {corr_updated}")
# ── 6. Summary ────────────────────────────────────────────────
if apply:
remaining = await conn.fetchval(
"SELECT COUNT(*) FROM halachot WHERE canonical_id IS NULL"
)
canonical_total = await conn.fetchval(
"SELECT COUNT(*) FROM canonical_halachot"
)
print(f"\n✅ backfill complete")
print(f" canonical_halachot rows: {canonical_total}")
print(f" halachot updated: {halacha_updated}")
print(f" halachot still without canonical_id: {remaining}")
else:
cluster_halachot = sum(
len([m for m in c if m in pending]) for c in components
)
print(f"\n[dry-run] would create:")
print(f" canonical_halachot for {len(components)} clusters "
f"({cluster_halachot} halachot) + {len(singletons)} singletons")
print(f" = ~{len(components) + len(singletons)} canonical principles "
f"from {len(pending)} halachot instances")
print(" Run with --apply to execute.")
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--apply", action="store_true",
help="Actually write to DB (default: dry-run, prints only)")
ap.add_argument("--verbose", "-v", action="store_true",
help="Print each cluster/singleton as it is processed")
args = ap.parse_args()
asyncio.run(_run(apply=args.apply, verbose=args.verbose))
if __name__ == "__main__":
main()