diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 2000c6e..227e02e 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -1585,6 +1585,90 @@ CREATE INDEX IF NOT EXISTS idx_missing_precedents_citation_norm """ +# ── V41: canonical_halachot ────────────────────────────────────────── +# Replaces the equivalent_halachot bidirectional-link model (V28) with a +# first-class canonical entity. Instead of recording that halacha A ≡ halacha B, +# we now have ONE canonical_halachot row that BOTH A and B point to. +# +# Each legal PRINCIPLE is defined ONCE here (canonical_statement = LLM- +# synthesized abstraction, grounded in source statements per INV-AH). The +# per-precedent halachot rows become INSTANCES that link to the canonical and +# carry only their own quote, treatment, and context. +# +# Extraction pipeline change (Phase 3, separate PR): lookup-before-insert — +# embed new extraction, cosine-search canonical_halachot (≥0.85); if match, +# store a thin 'citation' instance; if not, create new canonical + 'original' +# instance. This eliminates per-extraction duplication of the same principle. +# +# INV-DM7: authority (binding/persuasive) derived from +# first_established_in.precedent_level — never stored on canonical. +# INV-G10: only 'published' canonicals reach drafting agents. +# INV-AH: canonical_statement grounded in source statements, never invented; +# review_status='pending_synthesis' until chair verifies. +# G2: equivalent_halachot (V28) deprecated post-backfill (no parallel path). +SCHEMA_V41_SQL = """ +-- One row per unique legal principle across all precedents. +CREATE TABLE IF NOT EXISTS canonical_halachot ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + canonical_statement TEXT NOT NULL, + rule_type TEXT NOT NULL DEFAULT 'interpretive', + practice_areas TEXT[] NOT NULL DEFAULT '{}', + subject_tags TEXT[] NOT NULL DEFAULT '{}', + embedding vector(1024), + review_status TEXT NOT NULL DEFAULT 'pending_synthesis' + CHECK (review_status IN + ('pending_synthesis','pending_review','approved','published','rejected')), + first_established_in UUID REFERENCES case_law(id) ON DELETE SET NULL, + instance_count INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_canonical_halachot_status + ON canonical_halachot(review_status); +CREATE INDEX IF NOT EXISTS idx_canonical_halachot_practice + ON canonical_halachot USING gin(practice_areas); +CREATE INDEX IF NOT EXISTS idx_canonical_halachot_tags + ON canonical_halachot USING gin(subject_tags); +CREATE INDEX IF NOT EXISTS idx_canonical_halachot_vec + ON canonical_halachot USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 30); + +-- halachot: canonical linkage + role columns. +-- canonical_id: NULL until backfill_canonical_halachot.py runs; 100% filled after. +-- instance_type: role of this precedent's mention of the principle. +-- 'original' = the precedent that FIRST established the principle (source) +-- 'citation' = a later precedent that cites/applies the principle +-- 'application'= a later precedent that applies the principle to new facts +-- treatment: how this precedent's mention relates to the canonical principle. +-- Parallels halacha_citation_corroboration.treatment (X11) but for precedents +-- (X11 tracks citations from internal decisions; this tracks per-precedent treatment). +-- rule_statement + embedding become nullable: citation instances inherit these +-- from canonical_halachot. 'original' instances keep their own stored values. +ALTER TABLE halachot + ADD COLUMN IF NOT EXISTS canonical_id UUID + REFERENCES canonical_halachot(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS instance_type TEXT NOT NULL DEFAULT 'original' + CHECK (instance_type IN ('original','citation','application')), + ADD COLUMN IF NOT EXISTS treatment TEXT NOT NULL DEFAULT 'mentioned'; +ALTER TABLE halachot ALTER COLUMN rule_statement DROP NOT NULL; +ALTER TABLE halachot ALTER COLUMN embedding DROP NOT NULL; +CREATE INDEX IF NOT EXISTS idx_halachot_canonical ON halachot(canonical_id); +CREATE INDEX IF NOT EXISTS idx_halachot_instance_type ON halachot(instance_type); + +-- halacha_citation_corroboration (X11) gains canonical_id so the signal +-- aggregates at the principle level rather than the per-instance level. +-- Backfill: UPDATE halacha_citation_corroboration SET canonical_id = +-- (SELECT canonical_id FROM halachot WHERE id = halacha_id). +-- halacha_id is retained for audit trail. +ALTER TABLE halacha_citation_corroboration + ADD COLUMN IF NOT EXISTS canonical_id UUID + REFERENCES canonical_halachot(id) ON DELETE CASCADE; +CREATE INDEX IF NOT EXISTS idx_hcc_canonical + ON halacha_citation_corroboration(canonical_id) + WHERE canonical_id IS NOT NULL; +""" + + # Stable, arbitrary key for the session-level advisory lock that serialises # schema DDL across processes. Every short-lived process (cron drains, services) # re-runs the idempotent migrations on startup; without this lock two processes @@ -1602,7 +1686,7 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await _apply_schema_ddl(conn) finally: await conn.execute("SELECT pg_advisory_unlock($1)", _MIGRATION_LOCK_KEY) - logger.info("Database schema initialized (v1-v40)") + logger.info("Database schema initialized (v1-v41)") async def _apply_schema_ddl(conn: asyncpg.Connection) -> None: @@ -1647,6 +1731,7 @@ async def _apply_schema_ddl(conn: asyncpg.Connection) -> None: await conn.execute(SCHEMA_V38_SQL) await conn.execute(SCHEMA_V39_SQL) await conn.execute(SCHEMA_V40_SQL) + await conn.execute(SCHEMA_V41_SQL) async def init_schema() -> None: @@ -5764,12 +5849,20 @@ async def store_corroboration( s_id = _UUID(source_id) if isinstance(source_id, str) else source_id cl_id = _UUID(citing_case_law_id) if (citing_case_law_id and isinstance(citing_case_law_id, str)) else citing_case_law_id d_id = _UUID(citing_decision_id) if (citing_decision_id and isinstance(citing_decision_id, str)) else citing_decision_id + # INSERT ... SELECT so we can pull canonical_id from halachot in one round-trip. + # canonical_id is NULL until backfill_canonical_halachot.py runs; COALESCE keeps + # existing canonical_id on conflict so a pre-backfill row is upgraded when the + # same corroboration is re-stored post-backfill. await pool.execute( "INSERT INTO halacha_citation_corroboration " - "(halacha_id, citing_case_law_id, citing_decision_id, source_citation_id, treatment, match_score, match_context) " - "VALUES ($1,$2,$3,$4,$5,$6,$7) " + "(halacha_id, canonical_id, citing_case_law_id, citing_decision_id, " + " source_citation_id, treatment, match_score, match_context) " + "SELECT $1, h.canonical_id, $2, $3, $4, $5, $6, $7 " + "FROM halachot h WHERE h.id = $1 " "ON CONFLICT (halacha_id, source_citation_id) DO UPDATE SET " - "treatment=EXCLUDED.treatment, match_score=EXCLUDED.match_score", + "treatment=EXCLUDED.treatment, match_score=EXCLUDED.match_score, " + "canonical_id=COALESCE(EXCLUDED.canonical_id, " + " halacha_citation_corroboration.canonical_id)", h_id, cl_id, d_id, s_id, treatment, score, context, ) @@ -5846,6 +5939,101 @@ async def list_equivalent_for_halacha(halacha_id: UUID) -> list[dict]: ] +# ── Canonical halachot (V41) ───────────────────────────────────────────────── + +async def create_canonical_halacha( + statement: str, + rule_type: str = "interpretive", + practice_areas: list[str] | None = None, + subject_tags: list[str] | None = None, + embedding: list[float] | None = None, + first_established_in: "UUID | None" = None, + review_status: str = "pending_synthesis", +) -> "UUID": + """Insert a new canonical principle and return its id.""" + pool = await get_pool() + row = await pool.fetchrow( + "INSERT INTO canonical_halachot " + "(canonical_statement, rule_type, practice_areas, subject_tags, " + " embedding, first_established_in, review_status) " + "VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING id", + statement, + rule_type, + practice_areas or [], + subject_tags or [], + embedding, + first_established_in, + review_status, + ) + return row["id"] + + +async def nearest_canonical_halacha( + vec: list[float], + threshold: float = 0.85, + status_filter: tuple[str, ...] = ("approved", "published"), +) -> "tuple[str, float] | None": + """Return (canonical_id, cosine_sim) of the nearest approved/published canonical + whose cosine similarity to `vec` meets `threshold`, or None if none qualifies. + + Used by the extractor's lookup-before-insert (Phase 3) to detect whether + a newly extracted principle already exists in the registry. + """ + pool = await get_pool() + row = await pool.fetchrow( + "SELECT id::text AS id, 1 - (embedding <=> $1) AS sim " + "FROM canonical_halachot " + "WHERE embedding IS NOT NULL AND review_status = ANY($2::text[]) " + "ORDER BY embedding <=> $1 LIMIT 1", + vec, list(status_filter), + ) + if not row: + return None + sim = float(row["sim"]) + return (row["id"], sim) if sim >= threshold else None + + +async def refresh_canonical_instance_count(canonical_id: "UUID") -> None: + """Recount halachot rows pointing to this canonical and update instance_count.""" + pool = await get_pool() + await pool.execute( + "UPDATE canonical_halachot SET " + "instance_count = (SELECT COUNT(*) FROM halachot WHERE canonical_id = $1), " + "updated_at = now() " + "WHERE id = $1", + canonical_id, + ) + + +async def get_canonical_halacha(canonical_id: "UUID") -> "dict | None": + """Fetch one canonical principle with its instance list.""" + pool = await get_pool() + row = await pool.fetchrow( + "SELECT ch.id::text, ch.canonical_statement, ch.rule_type, " + " ch.practice_areas, ch.subject_tags, ch.review_status, " + " ch.instance_count, ch.created_at, ch.updated_at, " + " cl.case_number AS first_established_case " + "FROM canonical_halachot ch " + "LEFT JOIN case_law cl ON cl.id = ch.first_established_in " + "WHERE ch.id = $1", + canonical_id, + ) + if not row: + return None + instances = await pool.fetch( + "SELECT h.id::text, h.instance_type, h.treatment, h.supporting_quote, " + " h.page_reference, h.review_status AS instance_status, " + " cl.case_number, cl.case_name " + "FROM halachot h JOIN case_law cl ON cl.id = h.case_law_id " + "WHERE h.canonical_id = $1 ORDER BY h.instance_type, cl.case_number", + canonical_id, + ) + return { + **dict(row), + "instances": [dict(i) for i in instances], + } + + async def _annotate_equivalents(pool, out: list[dict]) -> None: """Attach an `equivalents` list to each row (#84.2) — parallel-authority links. diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 7c57a76..b9fe9d2 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -64,7 +64,8 @@ | `halacha_panel_audit.py` | python | **רשת-ביטחון לפאנל** (selective-prediction monitoring) — דוגם הלכות שאושרו ע"י הפאנל (`reviewer LIKE 'panel:%'`), מריץ עליהן **שוב** את הצבעת-ה-KEEP של 3 השופטים, ומציף כל מקרה שכעת נוטה DROP (false-keep פוטנציאלי). report-only כברירת-מחדל; `--flag` מחזיר את ה-flips ל-`pending_review` לסקירת-יו"ר. `--sample N`/`--seed`. בסיס 2026-06-07: 0/15. מיועד להרצה תקופתית (שבועי). מייבא שופטים מ-`halacha_panel_approve`. **חובה מקומי**. | תקופתי (שבועי) — ניטור | | `halacha_panel_calibrate.py` | python | **כיול + מדידת הפאנל** (Trust-or-Escalate, ICLR 2025). `--source live` (ברירת-מחדל): מריץ את שאלת-ה-KEEP על מדגם-הזהב ומודד מול `is_holding` precision+coverage+**split-rate** לכל מדיניות + false-keep/false-drop (מייבא שופטים מ-`halacha_panel_approve`, **חובה מקומי**). **#133/FU-5** — `--source captured`: **אפס-עלות** (בלי re-vote/LLM) — מצליב סבבים שמורים (FU-1) מול הכרעות-יו"ר (FU-2) דרך `db.panel_rounds_vs_chair` ומדווח split-rate+auto-precision **לכל סבב** (מגמת הלולאה: ככל שהרובריקה משתפרת precision נשמר ו-split יורד); משתף את `analyze_pairs` של FU-4 (מקור-יחיד). שתי המדידות מדווחות **anon-stability** (מבחן-אנונימיזציה #81.7) כמטריקת-בריאות נגד echo-chamber. `--batch`/`--limit`/`--concurrency`. | ידני — לפני חיווט `--apply` (live) / תקופתי — מעקב-לולאה (captured) | | `halacha_rubric_distill.py` | python | **#133/FU-4 — זיקוק-רובריקה PROPOSE-ONLY.** מצליב `halacha_panel_rounds` (FU-1, הצבעות+נימוקים) מול הכרעות-היו"ר (FU-2, seeds ב-`halacha_goldset` batch `chair-live`) דרך `db.panel_rounds_vs_chair` (read-only), מנתח דטרמיניסטית **כשלים שיטתיים** (false-keep/false-drop, פיצולים-שהוכרעו, שיעור-מחלוקת-עם-היו"ר לכל שופט), ומציע `KEEP_SYSTEM` v2 + exemplars מופשטים (claude_session מקומי, אפס עלות) כ**דוח-diff** ל-`data/learning/rubric-proposal-.md`. **לעולם לא auto-apply** — אימוץ v2 = עריכה אנושית של הקבוע דרך PR (INV-LRN1); exemplars מופשטים בלבד (INV-LRN5); הסיגנל היחיד = הכרעת-יו"ר, לא הצבעות-פאנל (anti-echo). מתחת ל-12 זוגות → "אין מספיק נתונים". `--no-llm` (סטטיסטיקה בלבד) / `--limit N`. **חובה מקומי**. | תקופתי — אחרי שהצטברו הכרעות-יו"ר על מחלוקות-פאנל | -| `halacha_batch_reconcile.py` | python | **#82.7** — dedup חוצה-פסקים offline (שמרני, **dry-run בלבד**). dedup-on-insert משווה רק תוך-פסק; כאן סף מחמיר (cosine ≥0.95, `--cosine`) ולא-הרסני: מאתר זוגות הלכות near-duplicate בין פסקים שונים (pgvector `<=>` exact) עם איתות לקסיקלי (Jaccard/Levenshtein) ומדווח ל-CSV ב-`data/audit/` לסקירת היו"ר. לא מדלג/ממזג/מוחק. `--include-pending`. **`--link`** רושם את הזוגות שנמצאו כ-`equivalent_halachot` (parallel authority, #84.2 — קישור-מקביל ברמת-הלכה, **לא** ציטוט; idempotent, לא-הרסני). רץ עם venv של mcp-server. אומת: 800 הלכות → 5 זוגות (קושרו). | ידני — דוח-סקירה / `--link` לקישור | +| `backfill_canonical_halachot.py` | python | **V41 — הקמת מודל ההלכות הקנוניות (חד-פעמי + idempotent).** (1) בונה רכיבים-קשורים (connected components) מ-`equivalent_halachot` (transitive closure — union-find). (2) לכל אשכול: בוחר נציג-קנוני (הכי הרבה corroboration → confidence → earliest), יוצר שורת `canonical_halachot`, ומעדכן `canonical_id` + `instance_type` לכל חברי האשכול. (3) לסינגלטונים (ללא קישורי-שוויון): 1:1 canonical. (4) מאכלס `halacha_citation_corroboration.canonical_id` מ-`halachot.canonical_id`. `--dry-run` (ברירת-מחדל, מחשב ומדווח בלבד) / `--apply` (כותב) / `--verbose`. לאחר הרצה: `canonical_statement` = ניסוח-נציג (pending_synthesis); עוקב: `backfill_canonical_synthesis.py` (Phase 4) יסנתז ניסוח-רחב דרך LLM. הרץ: `mcp-server/.venv/bin/python scripts/backfill_canonical_halachot.py --apply`. | **חד-פעמי** (לאחר deploy V41) / idempotent לפי צורך | +| `halacha_batch_reconcile.py` | python | **#82.7** — dedup חוצה-פסקים offline (שמרני, **dry-run בלבד**). dedup-on-insert משווה רק תוך-פסק; כאן סף מחמיר (cosine ≥0.95, `--cosine`) ולא-הרסני: מאתר זוגות הלכות near-duplicate בין פסקים שונים (pgvector `<=>` exact) עם איתות לקסיקלי (Jaccard/Levenshtein) ומדווח ל-CSV ב-`data/audit/` לסקירת היו"ר. לא מדלג/ממזג/מוחק. `--include-pending`. **`--link`** רושם את הזוגות שנמצאו כ-`equivalent_halachot` (parallel authority, #84.2 — **deprecated post-V41** — השתמש ב-`backfill_canonical_halachot.py --apply` במקום). רץ עם venv של mcp-server. | **deprecated** — הוחלף ב-`backfill_canonical_halachot.py` (V41). נשמר לצורכי audit | | `calibrate_halacha_dedup.py` | python | **#82.1** — כיול ספי ה-dedup הלקסיקלי (#82.3) מול gold-set הניקוי. קורא `halacha-cleanup-manifest-*.csv` (זוגות duplicate↔survivor מתויגי-אדם), טוען טקסט-survivor מה-DB, ו-sweep של (jaccard_min × levenshtein_min) עם P/R/F1, מסמן את נקודת-העבודה המוגדרת. אימת ש-(0.55, 0.70) → **precision 1.0** (אפס false-merge), recall 0.30 — מתאים לאיתות-משני שחוסם auto-approve. `--manifest `. רץ עם venv של mcp-server | חד-פעמי — כיול (בוצע 2026-06-06) | | `ab_halacha_opus48.py` | python | **A/B לא-הרסני לחילוץ הלכות (Claude)** — מריץ מחדש חילוץ הלכות על פסק-דין בודד דרך מודל/effort נבחרים (`AB_MODEL`/`AB_EFFORT`, ברירת-מחדל `claude-opus-4-8`/`xhigh`) ומשווה לסטטיסטיקות ההלכות הקיימות ב-DB **בלי למחוק/לכתוב כלום**. משכפל את `halacha_extractor.extract()` (אותם פרומפטים, בחירת-צ'אנקים, אימות-ציטוט) ומחליף רק את קריאת ה-LLM ב-`claude -p --model --effort`. מפיק `data/ab_halacha__.json`. הרצה: `DOTENV_PATH=/home/chaim/.env DATA_DIR=.../data .venv/bin/python scripts/ab_halacha_opus48.py `. **ממצא 2026-05-31 (שטיין 1128-08-20):** Opus 4.8@xhigh חילץ 51 מול 124 בייצור (100% quote-verified מול 96%) אך ביטחון מכויל-נמוך יותר (חציון 0.75 מול 0.82) — ולכן **לא** מקטין את תור-האישור-הידני תחת sweep אוטו-אישור conf≥0.78 (26 מול 24). שיפור איכות, לא צמצום-תור. | ידני (החלטת מודל-חילוץ) | | `ab_halacha_codex.py` | python | **A/B לא-הרסני לחילוץ הלכות (Codex/gpt-5.5)** — עמית ל-`ab_halacha_opus48` אך מחליף את `claude -p` ב-`codex exec --model gpt-5.5` (אימות ChatGPT, ללא OPENAI_API_KEY). אותם פרומפטים ואותו הסקת quote-verification. הפלט האחרון של הסוכן (`-o FILE`) נפענח כ-JSON. `AB_MODEL` (default `gpt-5.5`), `AB_REASONING` low/medium/high/xhigh (default `medium`), `AB_CONCURRENCY` (default 1), `CODEX_BIN`. מפיק `data/ab_halacha_codex___.json`. הרצה: `DOTENV_PATH=/home/chaim/.env DATA_DIR=.../data mcp-server/.venv/bin/python scripts/ab_halacha_codex.py `. **ממצא 2026-06-17 (8181-21 האוניברסיטה העברית):** gpt-5.5@medium חילץ 27 מול 28 של Opus (quote-verified 100%/100%), ביטחון חציון 0.86 מול 0.78 — אך **0 פריטים מתחת ל-0.7** (לעומת 9/28 של Opus = 32%), דבר המצביע על over-confidence. holding↑ (12 מול 7), procedural↓ (4 מול 7). **מסקנה: ריאלי כ-fallback חירום; לא מוכן לייצור ללא כיול-ביטחון.** | ידני (בנצ'מרק מודל codex) | diff --git a/scripts/backfill_canonical_halachot.py b/scripts/backfill_canonical_halachot.py new file mode 100644 index 0000000..91bef86 --- /dev/null +++ b/scripts/backfill_canonical_halachot.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""Backfill canonical_halachot table from existing halachot + equivalent_halachot. + +WHAT THIS DOES +-------------- +1. Finds connected components in equivalent_halachot (transitive closure). +2. For each cluster of ≥2 equivalent halachot: picks a canonical representative + (highest approved corroboration, then highest confidence, then earliest created), + creates ONE canonical_halachot row, and sets canonical_id on all cluster members. +3. For singleton halachot (not in any cluster): creates a 1:1 canonical. +4. Updates halacha_citation_corroboration.canonical_id from halachot.canonical_id. +5. Refreshes canonical_halachot.instance_count. +6. Marks cluster non-representative instances as instance_type='citation'. + +The backfill sets canonical_statement = representative's rule_statement. A +subsequent LLM synthesis pass (backfill_canonical_synthesis.py, Phase 4) will +replace this with a broader synthesized statement and set review_status='pending_review'. +Until then, review_status stays 'pending_synthesis'. + +IDEMPOTENCY +----------- +Halachot with canonical_id already set are skipped. Re-running only fills gaps. + +USAGE +----- +cd ~/legal-ai/mcp-server +.venv/bin/python ../scripts/backfill_canonical_halachot.py # dry-run +.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply # execute +.venv/bin/python ../scripts/backfill_canonical_halachot.py --apply --verbose +""" +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from collections import defaultdict +from datetime import datetime, timezone +from uuid import UUID + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) + +from legal_mcp.services import db # noqa: E402 + + +# ── connected-components helpers ────────────────────────────────────── + +def _build_components(pairs: list[tuple[UUID, UUID]]) -> list[set[UUID]]: + """Union-find over (a, b) pairs → list of connected-component sets.""" + parent: dict[UUID, UUID] = {} + + def find(x: UUID) -> UUID: + while parent.get(x, x) != x: + parent[x] = parent.get(parent.get(x, x), parent.get(x, x)) + x = parent.get(x, x) + return x + + def union(a: UUID, b: UUID) -> None: + ra, rb = find(a), find(b) + if ra != rb: + parent[rb] = ra + + for a, b in pairs: + union(a, b) + + groups: dict[UUID, set[UUID]] = defaultdict(set) + all_nodes = {n for pair in pairs for n in pair} + for node in all_nodes: + groups[find(node)].add(node) + return list(groups.values()) + + +# ── main ────────────────────────────────────────────────────────────── + +async def _run(apply: bool, verbose: bool) -> None: + pool = await db.get_pool() + + async with pool.acquire() as conn: + # ── 1. Load equivalent_halachot pairs ──────────────────────── + pair_rows = await conn.fetch( + "SELECT halacha_a, halacha_b FROM equivalent_halachot" + ) + pairs: list[tuple[UUID, UUID]] = [(r["halacha_a"], r["halacha_b"]) for r in pair_rows] + components = _build_components(pairs) + clustered_ids: set[UUID] = {h for c in components for h in c} + print(f"equivalent_halachot pairs: {len(pairs)}") + print(f"connected components (clusters ≥2): {len(components)}") + + # ── 2. Load all halachot that still need canonical_id ───────── + all_rows = await conn.fetch( + "SELECT h.id, h.rule_statement, h.rule_type, h.practice_areas, " + " h.subject_tags, h.embedding, h.case_law_id, h.confidence, " + " h.review_status, h.created_at, " + " COALESCE(cor.pos, 0) AS corroboration_count " + "FROM halachot h " + "LEFT JOIN (" + " SELECT halacha_id, COUNT(DISTINCT source_citation_id) FILTER " + " (WHERE treatment IN ('followed','explained')) AS pos " + " FROM halacha_citation_corroboration GROUP BY halacha_id" + ") cor ON cor.halacha_id = h.id " + "WHERE h.canonical_id IS NULL" + ) + pending = {r["id"]: dict(r) for r in all_rows} + print(f"halachot without canonical_id: {len(pending)}") + + if not pending: + print("✅ nothing to backfill — all halachot already have canonical_id.") + return + + # ── 3. Process clusters ─────────────────────────────────────── + def _pick_canonical(members: list[dict]) -> dict: + """Best representative: highest corroboration → highest confidence → earliest.""" + return max(members, key=lambda r: ( + 1 if r["review_status"] in ("approved", "published") else 0, + r["corroboration_count"], + float(r["confidence"] or 0), + -r["created_at"].timestamp(), + )) + + canonical_created = 0 + halacha_updated = 0 + + for component in components: + members = [pending[h] for h in component if h in pending] + if not members: + continue # cluster fully backfilled already + + rep = _pick_canonical(members) + if verbose: + print(f"\n cluster({len(members)}) rep={rep['id']} " + f"corr={rep['corroboration_count']} " + f"status={rep['review_status']}") + + if apply: + canonical_id = await conn.fetchval( + "INSERT INTO canonical_halachot " + "(canonical_statement, rule_type, practice_areas, subject_tags, " + " embedding, first_established_in, review_status, instance_count) " + "VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',$7) RETURNING id", + rep["rule_statement"] or "", + rep["rule_type"] or "interpretive", + rep["practice_areas"] or [], + rep["subject_tags"] or [], + rep["embedding"], + rep["case_law_id"], + len(members), + ) + canonical_created += 1 + + for m in members: + itype = "original" if m["id"] == rep["id"] else "citation" + await conn.execute( + "UPDATE halachot SET canonical_id=$1, instance_type=$2, " + "updated_at=now() WHERE id=$3", + canonical_id, itype, m["id"], + ) + halacha_updated += 1 + + # ── 4. Process singletons (no equivalent links) ─────────────── + singletons = [r for r in pending.values() if r["id"] not in clustered_ids] + print(f"\nsingletons (no equivalent links): {len(singletons)}") + + for r in singletons: + if verbose: + print(f" singleton: {r['id']}") + if apply: + canonical_id = await conn.fetchval( + "INSERT INTO canonical_halachot " + "(canonical_statement, rule_type, practice_areas, subject_tags, " + " embedding, first_established_in, review_status, instance_count) " + "VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',1) RETURNING id", + r["rule_statement"] or "", + r["rule_type"] or "interpretive", + r["practice_areas"] or [], + r["subject_tags"] or [], + r["embedding"], + r["case_law_id"], + ) + canonical_created += 1 + await conn.execute( + "UPDATE halachot SET canonical_id=$1, instance_type='original', " + "updated_at=now() WHERE id=$2", + canonical_id, r["id"], + ) + halacha_updated += 1 + + # ── 5. Backfill halacha_citation_corroboration.canonical_id ─── + if apply: + result = await conn.execute( + "UPDATE halacha_citation_corroboration hcc " + "SET canonical_id = h.canonical_id " + "FROM halachot h " + "WHERE hcc.halacha_id = h.id " + " AND hcc.canonical_id IS NULL " + " AND h.canonical_id IS NOT NULL" + ) + corr_updated = int(result.split()[-1]) + print(f"\ncorroboration rows backfilled: {corr_updated}") + + # ── 6. Summary ──────────────────────────────────────────────── + if apply: + remaining = await conn.fetchval( + "SELECT COUNT(*) FROM halachot WHERE canonical_id IS NULL" + ) + canonical_total = await conn.fetchval( + "SELECT COUNT(*) FROM canonical_halachot" + ) + print(f"\n✅ backfill complete") + print(f" canonical_halachot rows: {canonical_total}") + print(f" halachot updated: {halacha_updated}") + print(f" halachot still without canonical_id: {remaining}") + else: + cluster_halachot = sum( + len([m for m in c if m in pending]) for c in components + ) + print(f"\n[dry-run] would create:") + print(f" canonical_halachot for {len(components)} clusters " + f"({cluster_halachot} halachot) + {len(singletons)} singletons") + print(f" = ~{len(components) + len(singletons)} canonical principles " + f"from {len(pending)} halachot instances") + print(" Run with --apply to execute.") + + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--apply", action="store_true", + help="Actually write to DB (default: dry-run, prints only)") + ap.add_argument("--verbose", "-v", action="store_true", + help="Print each cluster/singleton as it is processed") + args = ap.parse_args() + asyncio.run(_run(apply=args.apply, verbose=args.verbose)) + + +if __name__ == "__main__": + main()