diff --git a/.taskmaster/tasks/tasks.json b/.taskmaster/tasks/tasks.json index 6df01e1..3b9896e 100644 --- a/.taskmaster/tasks/tasks.json +++ b/.taskmaster/tasks/tasks.json @@ -2056,9 +2056,9 @@ }, { "id": "60", - "title": "[FU-2] ingest idempotent + מזהים קנוניים", - "description": "מפתח-upsert דטרמיניסטי + נרמול case_number בכתיבה + תיאום מספרים מעורבים.", - "details": "מכסה GAP-03,06,07,08,13. מספק INV-ING2/G3/G1/ID1/ID2/DM2/DM1. severity: Critical. סוג: קוד + מיגרציית-נתונים (דורש אישור-עלות). תלוי ב-FU-1.", + "title": "[FU-2a] ingest idempotent + נרמול-בכתיבה + searchable (pure-code)", + "description": "upsert ON CONFLICT על מפתח קנוני + נרמול case_number בכתיבה (type-aware) + דגל searchable מפורש. אפס מיגרציית-נתונים.", + "details": "מכסה GAP-03,06,13. מספק INV-ING2/G3/G1/ID1/DM1. severity: Critical. סוג: pure-code (schema-additive). תלוי ב-FU-1 (#59). FU-2b (#67) מטפל ב-GAP-07/08 בנפרד.", "testStrategy": "", "status": "pending", "dependencies": [ @@ -2086,26 +2086,6 @@ "testStrategy": "", "parentId": "60" }, - { - "id": 3, - "title": "[GAP-07] תיאום מספרי-תיק מעורבים (with-month canonical)", - "description": "מיגרציה חד-פעמית; הצורה עם-חודש קנונית (החלטת-יו\"ר).", - "dependencies": [], - "details": "INV-ID1", - "status": "pending", - "testStrategy": "", - "parentId": "60" - }, - { - "id": 4, - "title": "[GAP-08] הסרת ציטוט-מלא כ-case_number", - "description": "רשומות עם ציטוט מלא כמזהה (legacy).", - "dependencies": [], - "details": "INV-DM2/ID2", - "status": "pending", - "testStrategy": "", - "parentId": "60" - }, { "id": 5, "title": "[GAP-13] שדה searchable מפורש", @@ -2358,6 +2338,40 @@ } ], "updatedAt": "2026-05-30T17:37:34.741136+00:00" + }, + { + "id": "67", + "title": "[FU-2b] תיאום מזהים קנוניים + ניקוי ציטוט-כמזהה (data-migration, chair)", + "description": "מיגרציה חד-פעמית של ~52+ רשומות case_law עם ציטוט-מלא ב-case_number → מספר-בסיס מנורמל; dedup (למשל 8047-23 כפול); הכרעת צורה קנונית per-record.", + "details": "מכסה GAP-07,08. מספק INV-ID1/ID2/DM2. severity: High. סוג: DATA-MIGRATION + chair-decision (מספר רשמי per-record, with-month canonical). דורש: גיבוי, dry-run, סקירת-יו\"ר, reversibility. תלוי ב-FU-2a (#60, לצורך פונקציית הנרמול). מקור: בדיקת DB 2026-05-30 — internal_committee ~52/56 ציטוט-מלא, ≥1 dup (8047-23), 1 בלתי-פתיר (ערר אדלר/cited_only).", + "testStrategy": "", + "status": "pending", + "dependencies": [ + "60" + ], + "priority": "high", + "subtasks": [ + { + "id": 1, + "title": "[GAP-07] תיאום מספרי-תיק מעורבים (with-month canonical)", + "description": "מיגרציה חד-פעמית; הצורה עם-חודש קנונית (החלטת-יו\"ר).", + "dependencies": [], + "details": "INV-ID1", + "status": "pending", + "testStrategy": "", + "parentId": "67" + }, + { + "id": 2, + "title": "[GAP-08] הסרת ציטוט-מלא כ-case_number", + "description": "רשומות עם ציטוט מלא כמזהה (legacy).", + "dependencies": [], + "details": "INV-DM2/ID2", + "status": "pending", + "testStrategy": "", + "parentId": "67" + } + ] } ], "metadata": { diff --git a/docs/superpowers/plans/2026-05-30-fu2a-idempotent-ingest.md b/docs/superpowers/plans/2026-05-30-fu2a-idempotent-ingest.md new file mode 100644 index 0000000..26809ed --- /dev/null +++ b/docs/superpowers/plans/2026-05-30-fu2a-idempotent-ingest.md @@ -0,0 +1,613 @@ +# FU-2a: Idempotent Ingest + Write-Time Normalization + `searchable` Flag — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make ingest idempotent (`ON CONFLICT` upsert), normalize identifiers at the write boundary (type-aware), and add a materialized `searchable` flag — all forward-only, no identifier migration. + +**Architecture:** Pure-code + one schema-additive migration (V21) in `db.py`. The two `create_*_case_law` functions move from app-level SELECT-then-INSERT/UPDATE to atomic `INSERT … ON CONFLICT … DO UPDATE` against the existing V15 partial unique indexes (predicate repeated). A new `_canonical_case_number` normalizes at write for identifier-keyed corpora (internal/cases), not for external (citation is its id). A new `searchable` boolean is recomputed from the completeness contract on ingest/metadata completion; the search-layer filter is gated behind a dry-run. + +**Tech Stack:** Python 3.12, asyncpg, PostgreSQL (pgvector) at localhost:5433, pytest offline, local `.venv` at `mcp-server/.venv`. + +**Spec:** [docs/superpowers/specs/2026-05-30-fu2a-idempotent-ingest-design.md](../specs/2026-05-30-fu2a-idempotent-ingest-design.md) + +**Run tests:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -v` +**DB smoke (real Postgres):** source `~/.env`, connect to `localhost:5433` db `legal_ai` (see Task 6). + +--- + +## File Structure + +- **Modify** `mcp-server/src/legal_mcp/services/db.py`: + - add `_canonical_case_number(s)` (pure) near `_normalize_case_number` (~line 1196). + - add pure `_compute_searchable(row, has_embedded_chunk)` + async `recompute_searchable(...)`. + - add `SCHEMA_V21_SQL` (after V20, ~line 1094) + wire into `_run_schema_migrations` (~line 1119). + - normalize at write in `create_case`, `create_internal_committee_decision` (NOT `create_external_case_law`). + - convert `create_external_case_law` + `create_internal_committee_decision` to `ON CONFLICT … DO UPDATE`. +- **Modify** `mcp-server/src/legal_mcp/services/ingest.py`: call `db.recompute_searchable(case_law_id)` after statuses are set (uniform, both types). +- **Modify** the search layer (`services/hybrid_search.py` and/or `db.py` search functions) — gated `searchable = true` filter (Task 6, only if dry-run is clean). +- **Create** `mcp-server/tests/test_idempotent_ingest.py` — offline tests for the pure pieces + ingest wiring. + +**Unchanged:** public signatures of `ingest_precedent`/`ingest_internal_decision` (FU-1) and the DB-create parameter lists. Normalization/upsert live inside the write boundary. + +--- + +## Task 1: Failing tests (pure logic + ingest wiring) + +**Files:** Create `mcp-server/tests/test_idempotent_ingest.py` + +- [ ] **Step 1: Write the failing tests** + +```python +"""FU-2a: idempotent ingest + write-time normalization + searchable flag. + +Offline tests for the *pure* pieces (canonical normalization, completeness +predicate) and ingest wiring. The real ON CONFLICT upsert is verified by a +DB smoke test against localhost:5433 (see plan Task 6), since it requires a +live Postgres partial unique index. +""" +from __future__ import annotations + +import asyncio +from uuid import uuid4 + +import pytest + +from legal_mcp.services import db, ingest + + +def _run(coro): + return asyncio.run(coro) + + +# ── GAP-06: canonical normalization (pure, deterministic) ────────────── +@pytest.mark.parametrize("raw,expected", [ + ("ערר 8137/24", "8137-24"), + (" עע\"מ 1/20 ", "1-20"), + ("8126-03-25", "8126-03-25"), # month segment preserved + ("בל\"מ 1010-01-25", "1010-01-25"), + ("8047/23", "8047-23"), +]) +def test_canonical_case_number(raw, expected): + assert db._canonical_case_number(raw) == expected + + +def test_canonical_does_not_invent_month(): + # No month in input → none added (X1 §1). + assert db._canonical_case_number("8126/24") == "8126-24" + + +# ── GAP-13: completeness predicate (pure) ────────────────────────────── +def _complete_row(): + return { + "case_number": "8047-23", "case_name": "פלוני נ' הוועדה", + "practice_area": "rishuy_uvniya", "source_kind": "internal_committee", + "extraction_status": "completed", "headnote": "תקציר", + "summary": "", "subject_tags": [], + } + + +def test_compute_searchable_true_when_complete(): + assert db._compute_searchable(_complete_row(), has_embedded_chunk=True) is True + + +def test_compute_searchable_false_without_embedded_chunk(): + assert db._compute_searchable(_complete_row(), has_embedded_chunk=False) is False + + +def test_compute_searchable_false_without_metadata(): + row = _complete_row() + row["headnote"] = ""; row["summary"] = ""; row["subject_tags"] = [] + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +def test_compute_searchable_false_when_extraction_incomplete(): + row = _complete_row(); row["extraction_status"] = "pending" + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +def test_compute_searchable_false_without_core_fields(): + row = _complete_row(); row["practice_area"] = "" + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +# ── ingest wires in recompute_searchable (both types) ────────────────── +def test_ingest_calls_recompute_searchable(monkeypatch, tmp_path): + calls = {"recompute": [], "meta": [], "hal": []} + + async def _extract_text(path): return ("text", 1, [0]) + monkeypatch.setattr(ingest.extractor, "extract_text", _extract_text) + monkeypatch.setattr(ingest.extractor, "strip_nevo_preamble", lambda t: t) + monkeypatch.setattr(ingest.chunker, "chunk_document", + lambda t, page_offsets=None: [type("C", (), { + "chunk_index": 0, "content": "c", "section_type": "b", + "page_number": 1})()]) + + async def _embed(texts, input_type="document"): return [[0.0] * 8 for _ in texts] + monkeypatch.setattr(ingest.embeddings, "embed_texts", _embed) + + async def _store(cid, dicts): return len(dicts) + monkeypatch.setattr(ingest.db, "store_precedent_chunks", _store) + + async def _create_internal(**kw): return {"id": uuid4()} + monkeypatch.setattr(ingest.db, "create_internal_committee_decision", _create_internal) + + async def _noop(*a, **k): return None + monkeypatch.setattr(ingest.db, "set_case_law_extraction_status", _noop) + monkeypatch.setattr(ingest.db, "set_case_law_halacha_status", _noop) + monkeypatch.setattr(ingest.db, "request_metadata_extraction", + lambda cid: calls["meta"].append(cid) or _noop()) + monkeypatch.setattr(ingest.db, "request_halacha_extraction", + lambda cid: calls["hal"].append(cid) or _noop()) + + async def _recompute(cid): calls["recompute"].append(cid) + monkeypatch.setattr(ingest.db, "recompute_searchable", _recompute) + monkeypatch.setattr(ingest.config, "PARENT_DOC_RETRIEVAL_ENABLED", False) + monkeypatch.setattr(ingest.config, "MULTIMODAL_ENABLED", False) + + from legal_mcp.services import internal_decisions + _run(internal_decisions.ingest_internal_decision( + case_number="8047/23", text="t", chair_name="x", practice_area="rishuy_uvniya")) + assert len(calls["recompute"]) == 1, "ingest must recompute searchable after success" +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -v` +Expected: FAIL — `AttributeError: module 'legal_mcp.services.db' has no attribute '_canonical_case_number'` (and `_compute_searchable`, `recompute_searchable`). + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/tests/test_idempotent_ingest.py +git commit -m "test(ingest): failing tests for idempotent ingest + searchable (FU-2a)" +``` + +--- + +## Task 2: `_canonical_case_number` + write-time normalization + +**Files:** Modify `mcp-server/src/legal_mcp/services/db.py` + +- [ ] **Step 1: Add `_canonical_case_number` next to `_normalize_case_number` (~line 1212)** + +```python +def _canonical_case_number(s: str) -> str: + """Canonical write-time form per X1 §1: trim · prefix-strip · '/'→'-'. + + Deterministic and format-only — does NOT add or remove a month segment. + Used at the write boundary for identifier-keyed corpora (internal + committee decisions, active cases). NOT for external precedents, whose + canonical identifier is the full citation. + """ + s = (s or "").strip() + m = re.search(r"\d", s) + if m: + s = s[m.start():] + return s.strip().replace("/", "-") +``` + +- [ ] **Step 2: Normalize at write in `create_case` (~line 1158)** + +Change the INSERT's `case_number` binding to normalized form. Replace `case_id, case_number, title,` with: + +```python + case_id, _canonical_case_number(case_number), title, +``` + +- [ ] **Step 3: Normalize at write in `create_internal_committee_decision` (top of function body, ~line 2649)** + +Immediately after `pool = await get_pool()`, add: + +```python + case_number = _canonical_case_number(case_number) +``` + +(Do NOT add this to `create_external_case_law` — external keeps its citation verbatim; that function only `.strip()`s, which the caller adapter already does.) + +- [ ] **Step 4: Run normalization tests** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "canonical" -v` +Expected: `test_canonical_case_number` (5 cases) + `test_canonical_does_not_invent_month` PASS. + +- [ ] **Step 5: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/db.py +git commit -m "feat(ingest): write-time canonical case_number normalization (GAP-06, FU-2a)" +``` + +--- + +## Task 3: Convert both create functions to `ON CONFLICT DO UPDATE` + +**Files:** Modify `mcp-server/src/legal_mcp/services/db.py` + +- [ ] **Step 1: Replace `create_external_case_law` body (lines 2566-2624, from `pool = await get_pool()` to `return _row_to_case_law(row)`)** + +```python + pool = await get_pool() + tags_json = json.dumps(subject_tags or [], ensure_ascii=False) + async with pool.acquire() as conn: + # Atomic upsert on the V15 partial unique index + # uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'. + # The predicate is repeated in ON CONFLICT (required for partial indexes). + # This also subsumes the old cited_only→external_upload promotion: a + # cited_only row with the same case_number conflicts and is promoted by + # DO UPDATE. Scoped to the external partial index, so an internal row with + # the same number is NOT touched (the old SELECT-without-source_kind could + # wrongly promote it). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, subject_tags, + summary, key_quote, full_text, source_url, + source_kind, document_id, extraction_status, + halacha_extraction_status, practice_area, appeal_subtype, + headnote, source_type, precedent_level, is_binding + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + 'external_upload', $10, 'processing', 'pending', + $11, $12, $13, $14, $15, $16 + ) + ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + headnote = EXCLUDED.headnote, + key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote), + full_text = EXCLUDED.full_text, + source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url), + source_type = EXCLUDED.source_type, + precedent_level = EXCLUDED.precedent_level, + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + source_kind = 'external_upload', + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, tags_json, + summary, key_quote, full_text, source_url, + document_id, practice_area, appeal_subtype, headnote, + source_type, precedent_level, is_binding, + ) + return _row_to_case_law(row) +``` + +- [ ] **Step 2: Replace `create_internal_committee_decision` body (lines 2649-2708)** + +```python + pool = await get_pool() + case_number = _canonical_case_number(case_number) + tags_json = json.dumps(subject_tags or [], ensure_ascii=False) + async with pool.acquire() as conn: + # Atomic upsert on V15 partial unique index + # uq_case_law_internal_number_proc (case_number, proceeding_type) + # WHERE source_kind = 'internal_committee'. Predicate repeated for the + # partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, chair_name, district, + subject_tags, summary, full_text, + source_kind, source_type, document_id, + extraction_status, halacha_extraction_status, + practice_area, appeal_subtype, is_binding, proceeding_type + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, + 'internal_committee', 'appeals_committee', $10, + 'processing', 'pending', + $11, $12, $13, $14 + ) + ON CONFLICT (case_number, proceeding_type) + WHERE source_kind = 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name), + district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + full_text = EXCLUDED.full_text, + source_type = 'appeals_committee', + source_kind = 'internal_committee', + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, chair_name, district, + tags_json, summary, full_text, + document_id, practice_area, appeal_subtype, is_binding, + proceeding_type, + ) + return _row_to_case_law(row) +``` + +- [ ] **Step 3: Verify import + no syntax error** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import db; print('db imports')"` +Expected: prints `db imports`. + +- [ ] **Step 4: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/db.py +git commit -m "feat(ingest): atomic ON CONFLICT upsert in create_*_case_law (GAP-03, FU-2a)" +``` + +--- + +## Task 4: V21 migration — `searchable` column + recompute + +**Files:** Modify `mcp-server/src/legal_mcp/services/db.py` + +- [ ] **Step 1: Add `SCHEMA_V21_SQL` after `SCHEMA_V20_SQL` (~line 1094)** + +```python +# ── V21: explicit `searchable` flag (GAP-13 / INV-DM1) ───────────── +# Materialized completeness flag — a case_law row is exposed to search only +# when it satisfies the completeness contract (02-data-model §2a). Recomputed +# on ingest/metadata completion via recompute_searchable(); not inferred at +# query time. Default false so a freshly-inserted row is excluded until proven +# complete. Health-check surfaces count(*) FILTER (WHERE NOT searchable). +SCHEMA_V21_SQL = """ +ALTER TABLE case_law ADD COLUMN IF NOT EXISTS searchable boolean NOT NULL DEFAULT false; +CREATE INDEX IF NOT EXISTS idx_case_law_searchable ON case_law (searchable); +""" +``` + +- [ ] **Step 2: Wire V21 into `_run_schema_migrations` (~line 1119) and bump the log line** + +After `await conn.execute(SCHEMA_V20_SQL)` add: + +```python + await conn.execute(SCHEMA_V21_SQL) +``` + +Change the log line `"Database schema initialized (v1-v20)"` → `"Database schema initialized (v1-v21)"`. + +- [ ] **Step 3: Add `_compute_searchable` (pure) + `recompute_searchable` (async) near the case_law helpers (after `create_internal_committee_decision`, ~line 2709)** + +```python +def _compute_searchable(row: dict, has_embedded_chunk: bool) -> bool: + """Completeness contract (INV-DM1 / 02-data-model §2a). + + A row is searchable IFF: canonical id present · case_name/practice_area/ + source_kind present · ≥1 chunk with a non-null embedding · extraction + completed · metadata non-empty (≥1 of headnote/summary/subject_tags). + Pure — `has_embedded_chunk` is supplied by the caller (cross-table check). + """ + if not has_embedded_chunk: + return False + if (row.get("extraction_status") or "") != "completed": + return False + if not (row.get("case_number") or "").strip(): + return False + if not (row.get("case_name") or "").strip(): + return False + if not (row.get("practice_area") or "").strip(): + return False + if not (row.get("source_kind") or "").strip(): + return False + tags = row.get("subject_tags") or [] + has_meta = bool((row.get("headnote") or "").strip()) \ + or bool((row.get("summary") or "").strip()) \ + or (len(tags) > 0) + return has_meta + + +async def recompute_searchable(case_law_id: "UUID | str | None" = None) -> int: + """Recompute and persist the `searchable` flag. Idempotent / reversible. + + If case_law_id is None, recompute ALL rows (used by the V21 backfill and + the dry-run). Returns the number of rows now marked searchable=true. + """ + pool = await get_pool() + async with pool.acquire() as conn: + if case_law_id is not None: + cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id)) + rows = await conn.fetch( + "SELECT * FROM case_law WHERE id = $1", cid) + else: + rows = await conn.fetch("SELECT * FROM case_law") + n_true = 0 + for r in rows: + row = dict(r) + # subject_tags is stored jsonb; _row_to_case_law parses it, but here + # we read raw — normalize to a list length check. + tags = row.get("subject_tags") + if isinstance(tags, str): + try: + tags = json.loads(tags) + except (ValueError, TypeError): + tags = [] + row["subject_tags"] = tags or [] + has_chunk = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM precedent_chunks " + "WHERE case_law_id = $1 AND embedding IS NOT NULL)", row["id"]) + val = _compute_searchable(row, bool(has_chunk)) + await conn.execute( + "UPDATE case_law SET searchable = $2 WHERE id = $1", row["id"], val) + if val: + n_true += 1 + return n_true +``` + +- [ ] **Step 4: Run the completeness-predicate tests** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "searchable and not ingest" -v` +Expected: all `test_compute_searchable_*` PASS. + +- [ ] **Step 5: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/db.py +git commit -m "feat(data-model): V21 searchable flag + recompute_searchable (GAP-13, FU-2a)" +``` + +--- + +## Task 5: Wire `recompute_searchable` into ingest + +**Files:** Modify `mcp-server/src/legal_mcp/services/ingest.py` + +- [ ] **Step 1: Call recompute after statuses are set in `ingest_document`** + +In `ingest.py`, find the block (added by FU-1) that sets statuses + queues extraction: +```python + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) +``` +Immediately AFTER `request_halacha_extraction`, add: +```python + await db.recompute_searchable(case_law_id) +``` + +> Rationale: at this point chunks+embeddings are stored and extraction_status is +> completed, so the completeness predicate is meaningful. Metadata may still be +> pending (queued), so the row may compute searchable=false until metadata fills — +> the metadata extractor also calls recompute (Task 5 Step 2). + +- [ ] **Step 2: Call recompute after metadata extraction fills fields** + +In `mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py`, find `extract_and_apply`'s success path (where it persists the filled metadata fields). After the DB update that writes the extracted metadata, add a call: +```python + await db.recompute_searchable(case_law_id) +``` +(Import `db` is already present in that module; if not, add `from legal_mcp.services import db`. Confirm by reading the file's imports first.) + +- [ ] **Step 3: Run the ingest-wiring test** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "ingest_calls_recompute" -v` +Expected: `test_ingest_calls_recompute_searchable` PASS. + +- [ ] **Step 4: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/ingest.py mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py +git commit -m "feat(ingest): recompute searchable on ingest + metadata completion (GAP-13, FU-2a)" +``` + +--- + +## Task 6: DB smoke + dry-run + GATED search filter + +**Files:** Modify search layer ONLY if dry-run is clean (see Step 4). + +- [ ] **Step 1: Apply the V21 migration to the local DB and smoke-test upsert idempotency** + +Run (sources env, exercises real Postgres): +```bash +cd ~/legal-ai && set -a && source ~/.env && set +a +cd mcp-server && .venv/bin/python -c " +import asyncio, uuid +from legal_mcp.services import db +async def main(): + await db.get_pool() # runs migrations incl V21 + # idempotent internal upsert: same (case_number, proceeding_type) twice + cn = 'ZZ9999/24' + r1 = await db.create_internal_committee_decision(case_number=cn, case_name='t', full_text='x', practice_area='rishuy_uvniya') + r2 = await db.create_internal_committee_decision(case_number=cn, case_name='t2', full_text='x2', practice_area='rishuy_uvniya') + assert r1['id'] == r2['id'], 'upsert must update, not duplicate' + # cleanup + pool = await db.get_pool() + async with pool.acquire() as c: + await c.execute(\"DELETE FROM case_law WHERE case_number = 'ZZ9999-24'\") + print('UPSERT IDEMPOTENT OK; normalized stored as ZZ9999-24') +asyncio.run(main()) +" +``` +Expected: `UPSERT IDEMPOTENT OK` and no duplicate. (Note: `ZZ9999/24` normalizes to `ZZ9999-24` — confirms write-time normalization too.) + +- [ ] **Step 2: Backfill the `searchable` flag (recompute, reversible)** + +```bash +cd ~/legal-ai && set -a && source ~/.env && set +a +cd mcp-server && .venv/bin/python -c " +import asyncio +from legal_mcp.services import db +async def main(): + n = await db.recompute_searchable() + print('recompute_searchable: rows now searchable =', n) +asyncio.run(main()) +" +``` + +- [ ] **Step 3: Dry-run report — which rows would drop from search if the filter is enabled** + +```bash +cd ~/legal-ai && set -a && source ~/.env && set +a +PGPASSWORD="$POSTGRES_PASSWORD" psql "host=$POSTGRES_HOST port=$POSTGRES_PORT dbname=$POSTGRES_DB user=$POSTGRES_USER" -c " +SELECT source_kind, + count(*) AS total, + count(*) FILTER (WHERE NOT searchable) AS would_drop +FROM case_law GROUP BY source_kind ORDER BY source_kind;" +``` +Report the table to the controller. **Decision gate:** if `would_drop` includes legitimate, currently-findable precedents (e.g. external_upload / internal_committee rows that users rely on), DO NOT enable the search filter in Step 4 — stop and report; the filter waits for FU-2b. If `would_drop` is only genuinely-incomplete rows, proceed. + +- [ ] **Step 4: (GATED) Enable `searchable = true` filter in the search layer** + +ONLY if Step 3 is clean. Read `mcp-server/src/legal_mcp/services/hybrid_search.py` to find the `case_law` WHERE clauses in `search_precedent_library_hybrid` / `search_documents_hybrid`. Add `AND cl.searchable = true` (alias as used in that query) to the case_law-joined precedent search paths. Add a focused test asserting a non-searchable row is excluded (monkeypatch or DB smoke). If deferred, write a one-line note in the spec §7 that the filter is pending FU-2b and skip. + +- [ ] **Step 5: Add health-check visibility** + +Find the health-check endpoint/function (search `def health` / `processing_status` in `web/app.py` or `tools/`). Add a field `non_searchable_case_law = SELECT count(*) FROM case_law WHERE NOT searchable`. Keep it a single cheap COUNT. + +- [ ] **Step 6: Commit** + +```bash +cd ~/legal-ai +git add -A mcp-server/ web/ +git commit -m "feat(retrieval): gated searchable filter + health-check visibility (GAP-13, FU-2a)" +``` + +--- + +## Task 7: Full suite + smoke + lint + TaskMaster + +- [ ] **Step 1: Full test suite** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q` +Expected: all pass (the FU-1 77 + new FU-2a tests). Report the summary line. + +- [ ] **Step 2: Smoke-import** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import db, ingest, precedent_library, internal_decisions; print('clean')"` +Expected: `clean`. + +- [ ] **Step 3: Lint changed files (if ruff available)** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/db.py src/legal_mcp/services/ingest.py 2>/dev/null; echo "exit=$?"` +Expected: clean or "ruff not available". + +- [ ] **Step 4: Mark TaskMaster #60 + subtasks done** + +Controller handles this (edit `.taskmaster/tasks/tasks.json`, verify via MCP get_task). Subtasks 60.1 (GAP-03), 60.2 (GAP-06), 60.5 (GAP-13). + +--- + +## Self-Review Notes + +- **GAP-03** → Task 3 (ON CONFLICT both functions). **GAP-06** → Task 2 (`_canonical_case_number` + write-time, type-aware). **GAP-13** → Tasks 4-5 (column + recompute + wiring) and gated Task 6 (filter). +- **No identifier migration** — FU-2b (#67) owns GAP-07/08. The V21 backfill only sets a derived, reversible flag. +- **Gated search filter** (Task 6 Step 3-4): the behavior-visible change is contingent on a clean dry-run; otherwise deferred. Surface the dry-run table to the user. +- **Offline-test limitation:** ON CONFLICT needs real Postgres → verified by Task 6 Step 1 smoke; offline tests cover the pure logic (normalize, completeness) and ingest wiring. +- **Type-consistency:** `_canonical_case_number`, `_compute_searchable(row, has_embedded_chunk)`, `recompute_searchable(case_law_id=None)` — names used identically in tests (Task 1) and impl (Tasks 2,4). diff --git a/docs/superpowers/specs/2026-05-30-fu2a-idempotent-ingest-design.md b/docs/superpowers/specs/2026-05-30-fu2a-idempotent-ingest-design.md new file mode 100644 index 0000000..b876102 --- /dev/null +++ b/docs/superpowers/specs/2026-05-30-fu2a-idempotent-ingest-design.md @@ -0,0 +1,140 @@ +# FU-2a — Idempotent Ingest + Write-Time Normalization + `searchable` Flag — עיצוב + +**סטטוס:** מאושר-לעיצוב · **תאריך:** 2026-05-30 · **ענף:** TBD +**מכסה:** GAP-03, GAP-06, GAP-13 · **מספק:** INV-ING2, INV-G3, INV-G1, INV-ID1, INV-DM1 +**מקורות:** [01-ingest.md](../../spec/01-ingest.md), [02-data-model.md](../../spec/02-data-model.md), [X1-identifiers.md](../../spec/X1-identifiers.md), [gap-audit.md](../../spec/gap-audit.md) +**משימה:** TaskMaster #60 · **תלוי ב:** FU-1 (#59) · **סוג:** pure-code (schema-additive) +**מיגרציה:** אין מיגרציית-מזהים (GAP-07/08 פוצלו ל-#67 / FU-2b). דגל `searchable` נגזר ו-recompute-בלבד. + +--- + +## 1. היקף ומה מחוץ להיקף + +FU-2 פוצל (החלטת-יו"ר 2026-05-30) לאחר שבדיקת-DB גילתה נתונים מבולגנים מהצפוי: +~52/56 רשומות `internal_committee` מחזיקות **ציטוט מלא** ב-`case_number`, יש ≥1 כפילות +(`8047-23`), ו-GAP-07 ("with-month canonical") דורש את המספר הרשמי שהוקצה — ידע-יו"ר. + +- **בהיקף (FU-2a, כאן):** GAP-03 (upsert idempotent), GAP-06 (נרמול-בכתיבה), GAP-13 (`searchable`). + הכל **pure-code / schema-additive**, משנה התנהגות *קדימה*, אפס מוטציה של מזהים קיימים. +- **מחוץ להיקף (FU-2b, #67):** GAP-07 (תיאום מזהים מעורבים), GAP-08 (ניקוי ציטוט-כמזהה) — + מיגרציית-נתונים שמערבת dedup, סקירת-יו"ר per-record, גיבוי, reversibility. + +**אינטראקציה FU-2a↔FU-2b (מתועד):** נרמול-בכתיבה חל רק על כתיבות *חדשות*. רשומות-עבר עם +ציטוט-מלא לא משתנות עד FU-2b. קליטה-חוזרת של רשומת-עבר מבולגנת תיצור רשומה *נקייה* חדשה +(לא תתנגש על המחרוזת השונה) — FU-2b יאחד. זה עקבי עם forward-only של FU-1. + +## 2. הכרעות אדריכליות (מאומתות ≥3 מקורות) + +| החלטה | נימוק | מקורות | +|-------|--------|--------| +| `INSERT … ON CONFLICT DO UPDATE` במקום SELECT-then-INSERT/UPDATE | אטומי, נטול race תחת Read-Committed; ה-SELECT-then-write הוא read-modify-write קלאסי | PostgreSQL INSERT docs; QueryPlane; on-systems.tech | +| **לחזור על predicate של ה-partial-index ב-ON CONFLICT** | V15 משתמש ב-partial unique indexes; Postgres דורש את ה-predicate ב-conflict target | PostgreSQL INSERT docs (§ON CONFLICT); QueryPlane gotchas | +| נרמול case_number **בכתיבה**, type-aware | נרמול הוא אחריות-גבול-קלט; ערכים שווי-משמעות → פלט זהה. פסיקה-חיצונית: הציטוט *הוא* המזהה → לא לחתוך | DDD value-objects (Medium/dev.to); gojko.net | +| דגל `searchable` **materialized** ונגזר-מחדש, לא מוסק בכל query | reify את חוזה-השלמות; חייב להיות נראה ל-health-check (לא הסקה סמויה) | DevIQ MISU; functional-architecture.org; Stemmler | + +## 3. הקבצים + +- **Modify** `mcp-server/src/legal_mcp/services/db.py`: + - `create_external_case_law` — להמיר ל-`ON CONFLICT` (target: `(case_number) WHERE source_kind <> 'internal_committee'`); זה גם מטפל בקידום `cited_only`→`external_upload` (אותו partial-index). לא לחתוך את ה-citation (זהו המזהה). + - `create_internal_committee_decision` — להמיר ל-`ON CONFLICT (case_number, proceeding_type) WHERE source_kind = 'internal_committee'`; לנרמל `case_number` בכניסה. + - `create_case` — לנרמל `case_number` בכניסה (כתיבה). + - הוספת helper `_canonical_case_number(s)` (שם מפורש; עוטף את הטרנספורם הדטרמיניסטי trim·prefix-strip·/→- של X1). `_normalize_case_number` הקיים (read-time) נשאר כ-shim. + - מיגרציית-schema **V21**: `ALTER TABLE case_law ADD COLUMN searchable boolean NOT NULL DEFAULT false`. + - פונקציה `recompute_searchable(case_law_id|all)` — נגזרת מחוזה-השלמות; נקראת בסיום-קליטה ובסיום-חילוץ-metadata. +- **Modify** `mcp-server/src/legal_mcp/services/ingest.py` — בסיום הצלחת הקליטה, לקרוא `db.recompute_searchable(case_law_id)` (אחיד לכל סוג; אחרי setting statuses). +- **Test** `mcp-server/tests/test_idempotent_ingest.py` (חדש) — offline, monkeypatched. + +**גבול:** אין שינוי לחתימות הציבוריות של `ingest_precedent`/`ingest_internal_decision` (FU-1). +הנרמול וה-upsert יושבים בשכבת-ה-DB (גבול-הכתיבה), שקופים לקוראים. + +## 4. נרמול type-aware (GAP-06) + +`_canonical_case_number(s)` — דטרמיניסטי, תואם X1 §1, **לא מוסיף/מסיר חודש**: +``` +trim → strip prefix לפני הספרה הראשונה → להחליף '/' ב-'-' +``` + +| נקודת-כתיבה | מדיניות | נימוק | +|--------------|---------|--------| +| `create_internal_committee_decision` | `_canonical_case_number(case_number)` | המזהה הקנוני = מספר-בסיס מנורמל | +| `create_case` | `_canonical_case_number(case_number)` | תיק פעיל — אותו כלל | +| `create_external_case_law` | `.strip()` בלבד (ללא prefix-strip) | פסיקה חיצונית: ה-citation הוא המזהה הקנוני (X1 §1); חיתוך היה הורס אותו | + +> נרמול מטפל ב-prefix+separator בלבד. קלט שהוא ציטוט-מלא (party names, נבו) **לא** מנוקה ל-bare +> ע"י הנרמול — זה GAP-08/FU-2b. FU-2a מבטיח שקלט נקי-יחסית נשמר בצורה קנונית. + +## 5. Idempotent upsert (GAP-03) + +שתי פונקציות-ה-create עוברות מ-SELECT-then-INSERT/UPDATE ל-`INSERT … ON CONFLICT … DO UPDATE`, +עם **חזרה על ה-predicate** של ה-partial-index (V15): + +- **internal:** `ON CONFLICT (case_number, proceeding_type) WHERE source_kind = 'internal_committee' DO UPDATE SET …` +- **external:** `ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' DO UPDATE SET …` + — מחליף את לוגיקת ה-SELECT הקיימת, **כולל** קידום `cited_only`→`external_upload` (אותה partial- + index חלה על שניהם; ה-DO UPDATE מקדם את source_kind וממלא שדות חסרים). + +**`DO UPDATE` ממוקד:** רק שדות-קלט לא-ריקים דורסים (לשמר ערכים קיימים; `COALESCE(EXCLUDED.x, case_law.x)`), +ולא לדרוס מטא-דאטה שמולא ע"י חילוץ-LLM. אם ל-`case_law` יש טריגרי-`updated_at` — לסנן עם `WHERE` +על שינוי בפועל (gotcha מהמחקר). re-embed בקליטה-חוזרת = INV-ING4, שייך ל-FU-3 — כאן רק upsert-הרשומה. + +## 6. דגל `searchable` (GAP-13) + +עמודה חדשה `case_law.searchable boolean NOT NULL DEFAULT false`. **נגזרת** מחוזה-השלמות +(02-data-model §2a / INV-DM1), לא מוסקת ב-query: + +``` +searchable = ( + case_number/citation קנוני לא-ריק + AND case_name<>'' AND practice_area<>'' AND source_kind<>'' + AND EXISTS(precedent_chunk עם embedding NOT NULL) + AND extraction_status='completed' + AND (headnote<>'' OR summary<>'' OR jsonb_array_length(subject_tags)>0) +) +``` + +- `recompute_searchable(case_law_id)` נקראת בסיום-קליטה (ingest.py) ובסיום `precedent_metadata_extractor`. +- **Backfill (recompute-בלבד, הפיך):** מיגרציה V21 מריצה `recompute_searchable(all)` פעם אחת על רשומות + קיימות. זו גזירה הפיכה (ניתן להריץ שוב כל רגע) — אינה נוגעת במזהים, לא חלק מ-FU-2b. +- שכבת-החיפוש (`search_*`) תסונן ל-`searchable=true` — **שינוי-התנהגות מתועד** (ראה §7). +- health-check יחשוף `count(*) FILTER (WHERE NOT searchable)` (זרע ל-GAP-14/FU-5). + +## 7. שינויי-התנהגות וסיכון + +| שינוי | השפעה | סיכון | +|--------|--------|--------| +| upsert ON CONFLICT | קליטה-חוזרת = update אטומי, לא כפילות; קידום cited_only נשמר | נמוך — מאומת מול partial-index הקיים | +| נרמול-בכתיבה (internal/cases) | קלט חדש נשמר כ-bare מנורמל | נמוך — type-aware; external לא נחתך | +| `searchable` מסנן חיפוש | רשומות שלא עומדות בחוזה-השלמות **לא יוחזרו** | ⚠️ בינוני — backfill עלול לסמן רשומות-עבר כ-non-searchable. **אימות:** להריץ recompute ב-dry-run ולדווח כמה ירדו מהחיפוש *לפני* הפעלת הסינון | +| backfill searchable | דגל נגזר על רשומות קיימות | נמוך — הפיך, recompute-בלבד, לא נוגע במזהים | + +**אזהרת-backlog:** ה-rows עם ציטוט-מלא-כמזהה (FU-2b) עשויים בכל זאת לעמוד בחוזה-השלמות (יש להם +chunks+metadata), כך שהסינון לא בהכרח מפיל אותם. ה-dry-run ב-§7 יכמת זאת לפני הפעלה. + +## 8. אסטרטגיית בדיקה + +`tests/test_idempotent_ingest.py` — offline, monkeypatch ל-DB pool (או בדיקת-SQL מול sqlite-fallback אם קיים בפרויקט; אחרת monkeypatch כמו FU-1). מקרים: +1. `_canonical_case_number`: `"ערר 8137/24"`→`"8137-24"`, `"8126-03-25"`→`"8126-03-25"` (חודש נשמר), `" עע\"מ 1/20 "`→`"1-20"`. +2. נרמול type-aware: internal מנרמל; external **לא** חותך citation. +3. upsert: קליטה כפולה של אותו (case_number, proceeding_type) internal = רשומה אחת (לא שתיים). +4. upsert: קידום `cited_only`→`external_upload` על אותו case_number = עדכון, לא כפילות. +5. `DO UPDATE` ממוקד: מטא-דאטה קיים לא נדרס ע"י קלט ריק (COALESCE). +6. `recompute_searchable`: רשומה מלאה→true; חסרת-embedding/metadata/extraction→false. +7. ingest קורא recompute_searchable בסיום (שני הסוגים). + +> בדיקת ON CONFLICT האמיתית דורשת Postgres. אם אין מסלול-בדיקה מול DB אמיתי בפרויקט, +> הבדיקות יאמתו את בניית-ה-SQL ואת הלוגיקה הטהורה (normalize, completeness predicate) ב-offline, +> ושכבת-ה-SQL תיבדק ב-smoke מול ה-DB המקומי (5433) ידנית בסיום, מתועד בתוכנית. + +## 9. סדר-ביצוע + +1. בדיקות אדומות (`test_idempotent_ingest.py`). +2. `_canonical_case_number` + נרמול-בכתיבה ב-3 פונקציות ה-create. +3. המרת שתי create ל-`ON CONFLICT … DO UPDATE` (עם predicate חוזר + COALESCE ממוקד). +4. מיגרציה V21: עמודה `searchable` + `recompute_searchable` + backfill recompute. +5. קריאה ל-`recompute_searchable` מ-ingest.py; חשיפת `count FILTER (WHERE NOT searchable)` ב-health-check. +6. **dry-run** של backfill מול DB 5433 → לדווח כמה רשומות יסומנו `searchable=false` ומאילו source_kind. +7. **שער החלטה (gated):** סינון `searchable=true` בשכבת-החיפוש מופעל **רק אם** ה-dry-run מראה + שאף רשומה לגיטימית לא יורדת מהחיפוש. אם רשומות-עבר לגיטימיות היו נופלות (למשל מבולגנות-FU-2b + שעדיין שמישות) — לדחות את הפעלת-הסינון לפולואו-אפ אחרי FU-2b, ולהשאיר את העמודה+health-check בלבד. + (להציף את ממצא ה-dry-run למשתמש לפני הפעלה — שינוי חיפוש הוא פעולה גלויה.) +8. בדיקות ירוקות + smoke מול DB מקומי + lint. diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index ac9ac70..632746e 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -1094,6 +1094,18 @@ CREATE INDEX IF NOT EXISTS idx_case_law_meta_tsv """ +# ── V21: explicit `searchable` flag (GAP-13 / INV-DM1) ───────────── +# Materialized completeness flag — a case_law row is exposed to search only +# when it satisfies the completeness contract (02-data-model §2a). Recomputed +# on ingest/metadata completion via recompute_searchable(); not inferred at +# query time. Default false so a freshly-inserted row is excluded until proven +# complete. Health-check surfaces count(*) FILTER (WHERE NOT searchable). +SCHEMA_V21_SQL = """ +ALTER TABLE case_law ADD COLUMN IF NOT EXISTS searchable boolean NOT NULL DEFAULT false; +CREATE INDEX IF NOT EXISTS idx_case_law_searchable ON case_law (searchable); +""" + + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: await conn.execute(SCHEMA_SQL) @@ -1117,7 +1129,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V18_SQL) await conn.execute(SCHEMA_V19_SQL) await conn.execute(SCHEMA_V20_SQL) - logger.info("Database schema initialized (v1-v20)") + await conn.execute(SCHEMA_V21_SQL) + logger.info("Database schema initialized (v1-v21)") async def init_schema() -> None: @@ -1155,7 +1168,7 @@ async def create_case( hearing_date, notes, expected_outcome, practice_area, appeal_subtype, proceeding_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)""", - case_id, case_number, title, + case_id, _canonical_case_number(case_number), title, json.dumps(appellants or []), json.dumps(respondents or []), subject, property_address, permit_number, committee_type, @@ -1211,6 +1224,21 @@ def _normalize_case_number(s: str) -> str: return s.strip().replace("/", "-") +def _canonical_case_number(s: str) -> str: + """Canonical write-time form per X1 §1: trim · prefix-strip · '/'→'-'. + + Deterministic and format-only — does NOT add or remove a month segment. + Used at the write boundary for identifier-keyed corpora (internal + committee decisions, active cases). NOT for external precedents, whose + canonical identifier is the full citation. + """ + s = (s or "").strip() + m = re.search(r"\d", s) + if m: + s = s[m.start():] + return s.strip().replace("/", "-") + + async def get_case_by_number(case_number: str) -> dict | None: pool = await get_pool() norm = _normalize_case_number(case_number) @@ -2566,61 +2594,54 @@ async def create_external_case_law( pool = await get_pool() tags_json = json.dumps(subject_tags or [], ensure_ascii=False) async with pool.acquire() as conn: - existing = await conn.fetchrow( - "SELECT id, source_kind FROM case_law WHERE case_number = $1", - case_number, - ) - if existing: - row = await conn.fetchrow( - """ - UPDATE case_law SET - case_name = $2, - court = COALESCE(NULLIF($3, ''), court), - date = COALESCE($4, date), - practice_area = $5, - appeal_subtype = $6, - subject_tags = $7, - summary = COALESCE(NULLIF($8, ''), summary), - headnote = $9, - key_quote = COALESCE(NULLIF($10, ''), key_quote), - full_text = $11, - source_url = COALESCE(NULLIF($12, ''), source_url), - source_type = $13, - precedent_level = $14, - is_binding = $15, - document_id = COALESCE($16, document_id), - source_kind = 'external_upload', - extraction_status = 'processing', - halacha_extraction_status = 'pending' - WHERE id = $1 - RETURNING * - """, - existing["id"], case_name, court, decision_date, - practice_area, appeal_subtype, tags_json, summary, headnote, - key_quote, full_text, source_url, source_type, - precedent_level, is_binding, document_id, - ) - else: - row = await conn.fetchrow( - """ - INSERT INTO case_law ( - case_number, case_name, court, date, subject_tags, - summary, key_quote, full_text, source_url, - source_kind, document_id, extraction_status, - halacha_extraction_status, practice_area, appeal_subtype, - headnote, source_type, precedent_level, is_binding - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, - 'external_upload', $10, 'processing', 'pending', - $11, $12, $13, $14, $15, $16 - ) - RETURNING * - """, - case_number, case_name, court, decision_date, tags_json, + # Atomic upsert on the V15 partial unique index + # uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'. + # The predicate is repeated in ON CONFLICT (required for partial indexes). + # This also subsumes the old cited_only→external_upload promotion: a + # cited_only row with the same case_number conflicts and is promoted by + # DO UPDATE. Scoped to the external partial index, so an internal row with + # the same number is NOT touched (the old SELECT-without-source_kind could + # wrongly promote it). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, subject_tags, summary, key_quote, full_text, source_url, - document_id, practice_area, appeal_subtype, headnote, - source_type, precedent_level, is_binding, + source_kind, document_id, extraction_status, + halacha_extraction_status, practice_area, appeal_subtype, + headnote, source_type, precedent_level, is_binding + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + 'external_upload', $10, 'processing', 'pending', + $11, $12, $13, $14, $15, $16 ) + ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + headnote = EXCLUDED.headnote, + key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote), + full_text = EXCLUDED.full_text, + source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url), + source_type = EXCLUDED.source_type, + precedent_level = EXCLUDED.precedent_level, + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + source_kind = 'external_upload', + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, tags_json, + summary, key_quote, full_text, source_url, + document_id, practice_area, appeal_subtype, headnote, + source_type, precedent_level, is_binding, + ) return _row_to_case_law(row) @@ -2647,67 +2668,123 @@ async def create_internal_committee_decision( filed against an existing appeal with the same number). """ pool = await get_pool() + case_number = _canonical_case_number(case_number) tags_json = json.dumps(subject_tags or [], ensure_ascii=False) async with pool.acquire() as conn: - existing = await conn.fetchrow( - "SELECT id FROM case_law " - "WHERE case_number = $1 AND proceeding_type = $2 " - " AND source_kind = 'internal_committee'", - case_number, proceeding_type, + # Atomic upsert on V15 partial unique index + # uq_case_law_internal_number_proc (case_number, proceeding_type) + # WHERE source_kind = 'internal_committee'. Predicate repeated for the + # partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, chair_name, district, + subject_tags, summary, full_text, + source_kind, source_type, document_id, + extraction_status, halacha_extraction_status, + practice_area, appeal_subtype, is_binding, proceeding_type + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, + 'internal_committee', 'appeals_committee', $10, + 'processing', 'pending', + $11, $12, $13, $14 + ) + ON CONFLICT (case_number, proceeding_type) + WHERE source_kind = 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name), + district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + full_text = EXCLUDED.full_text, + source_type = 'appeals_committee', + source_kind = 'internal_committee', + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, chair_name, district, + tags_json, summary, full_text, + document_id, practice_area, appeal_subtype, is_binding, + proceeding_type, ) - if existing: - row = await conn.fetchrow( - """ - UPDATE case_law SET - case_name = $2, - court = COALESCE(NULLIF($3, ''), court), - date = COALESCE($4, date), - chair_name = COALESCE(NULLIF($5, ''), chair_name), - district = COALESCE(NULLIF($6, ''), district), - practice_area = $7, - appeal_subtype = $8, - subject_tags = $9, - summary = COALESCE(NULLIF($10, ''), summary), - full_text = $11, - source_type = 'appeals_committee', - source_kind = 'internal_committee', - is_binding = $12, - document_id = COALESCE($13, document_id), - extraction_status = 'processing', - halacha_extraction_status = 'pending' - WHERE id = $1 - RETURNING * - """, - existing["id"], case_name, court, decision_date, - chair_name, district, practice_area, appeal_subtype, - tags_json, summary, full_text, is_binding, document_id, - ) - else: - row = await conn.fetchrow( - """ - INSERT INTO case_law ( - case_number, case_name, court, date, chair_name, district, - subject_tags, summary, full_text, - source_kind, source_type, document_id, - extraction_status, halacha_extraction_status, - practice_area, appeal_subtype, is_binding, proceeding_type - ) VALUES ( - $1, $2, $3, $4, $5, $6, - $7, $8, $9, - 'internal_committee', 'appeals_committee', $10, - 'processing', 'pending', - $11, $12, $13, $14 - ) - RETURNING * - """, - case_number, case_name, court, decision_date, chair_name, district, - tags_json, summary, full_text, - document_id, practice_area, appeal_subtype, is_binding, - proceeding_type, - ) return _row_to_case_law(row) +def _compute_searchable(row: dict, has_embedded_chunk: bool) -> bool: + """Completeness contract (INV-DM1 / 02-data-model §2a). + + A row is searchable IFF: canonical id present · case_name/practice_area/ + source_kind present · ≥1 chunk with a non-null embedding · extraction + completed · metadata non-empty (≥1 of headnote/summary/subject_tags). + Pure — `has_embedded_chunk` is supplied by the caller (cross-table check). + """ + if not has_embedded_chunk: + return False + if (row.get("extraction_status") or "") != "completed": + return False + if not (row.get("case_number") or "").strip(): + return False + if not (row.get("case_name") or "").strip(): + return False + # practice_area is required only for identifier-keyed corpora (internal + # committee decisions, active cases). External precedents (e.g. בג"ץ) are + # legitimately cross-domain and may have no single practice_area. + if (row.get("source_kind") or "") != "external_upload": + if not (row.get("practice_area") or "").strip(): + return False + if not (row.get("source_kind") or "").strip(): + return False + tags = row.get("subject_tags") or [] + has_meta = bool((row.get("headnote") or "").strip()) \ + or bool((row.get("summary") or "").strip()) \ + or (len(tags) > 0) + return has_meta + + +async def recompute_searchable(case_law_id: "UUID | str | None" = None) -> int: + """Recompute and persist the `searchable` flag. Idempotent / reversible. + + If case_law_id is None, recompute ALL rows (used by the V21 backfill and + the dry-run). Returns the number of rows now marked searchable=true. + """ + pool = await get_pool() + async with pool.acquire() as conn: + if case_law_id is not None: + cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id)) + rows = await conn.fetch( + "SELECT * FROM case_law WHERE id = $1", cid) + else: + rows = await conn.fetch("SELECT * FROM case_law") + n_true = 0 + for r in rows: + row = dict(r) + tags = row.get("subject_tags") + if isinstance(tags, str): + try: + tags = json.loads(tags) + except (ValueError, TypeError): + tags = [] + row["subject_tags"] = tags or [] + has_chunk = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM precedent_chunks " + "WHERE case_law_id = $1 AND embedding IS NOT NULL)", row["id"]) + val = _compute_searchable(row, bool(has_chunk)) + await conn.execute( + "UPDATE case_law SET searchable = $2 WHERE id = $1", row["id"], val) + if val: + n_true += 1 + return n_true + + async def update_case_law(case_law_id: UUID, **fields) -> dict | None: """Patch metadata fields on a case_law row. @@ -3199,8 +3276,9 @@ async def search_precedent_library_semantic( halacha_filters = [ "h.review_status IN ('approved', 'published')", f"cl.source_kind = '{source_kind}'", + "cl.searchable = true", ] - chunk_filters = [f"cl.source_kind = '{source_kind}'"] + chunk_filters = [f"cl.source_kind = '{source_kind}'", "cl.searchable = true"] h_params: list = [query_embedding, limit] c_params: list = [query_embedding, limit] h_idx = 3 @@ -3435,8 +3513,9 @@ async def search_precedent_library_lexical( halacha_filters = [ "h.review_status IN ('approved', 'published')", f"cl.source_kind = '{source_kind}'", + "cl.searchable = true", ] - chunk_filters = [f"cl.source_kind = '{source_kind}'"] + chunk_filters = [f"cl.source_kind = '{source_kind}'", "cl.searchable = true"] # $1 = query, $2 = limit. Filters append starting at $3. h_params: list = [query, limit] c_params: list = [query, limit] diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py index 45a9cc6..35d56f8 100644 --- a/mcp-server/src/legal_mcp/services/ingest.py +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -197,6 +197,7 @@ async def ingest_document( await db.set_case_law_halacha_status(case_law_id, "pending") await db.request_metadata_extraction(case_law_id) await db.request_halacha_extraction(case_law_id) + await db.recompute_searchable(case_law_id) await progress("completed", 100, f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") diff --git a/mcp-server/src/legal_mcp/services/metrics.py b/mcp-server/src/legal_mcp/services/metrics.py index ce43dda..c32b4f5 100644 --- a/mcp-server/src/legal_mcp/services/metrics.py +++ b/mcp-server/src/legal_mcp/services/metrics.py @@ -123,6 +123,9 @@ async def get_dashboard() -> dict: total_corpus = await conn.fetchval("SELECT COUNT(*) FROM style_corpus") total_patterns = await conn.fetchval("SELECT COUNT(*) FROM style_patterns") total_case_law = await conn.fetchval("SELECT COUNT(*) FROM case_law") + non_searchable_case_law = await conn.fetchval( + "SELECT COUNT(*) FROM case_law WHERE NOT searchable" + ) # QA summary qa_total = await conn.fetchval("SELECT COUNT(DISTINCT case_id) FROM qa_results") @@ -154,6 +157,7 @@ async def get_dashboard() -> dict: "style_corpus": total_corpus, "style_patterns": total_patterns, "case_law_entries": total_case_law, + "non_searchable_case_law": non_searchable_case_law, }, "cases_by_status": cases_by_status, "qa": { diff --git a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py index 200d89f..a30e2ea 100644 --- a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py +++ b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py @@ -368,6 +368,8 @@ async def extract_and_apply( if not suggested: return {"status": "no_metadata", "fields": []} result = await apply_to_record(case_law_id, suggested, overwrite_case_number=overwrite_case_number) + if result["updated"]: + await db.recompute_searchable(case_law_id) return { "status": "completed" if result["updated"] else "no_changes", "fields": result["fields"], diff --git a/mcp-server/tests/test_idempotent_ingest.py b/mcp-server/tests/test_idempotent_ingest.py new file mode 100644 index 0000000..a78b754 --- /dev/null +++ b/mcp-server/tests/test_idempotent_ingest.py @@ -0,0 +1,119 @@ +"""FU-2a: idempotent ingest + write-time normalization + searchable flag. + +Offline tests for the *pure* pieces (canonical normalization, completeness +predicate) and ingest wiring. The real ON CONFLICT upsert is verified by a +DB smoke test against localhost:5433 (see plan Task 6), since it requires a +live Postgres partial unique index. +""" +from __future__ import annotations + +import asyncio +from uuid import uuid4 + +import pytest + +from legal_mcp.services import db, ingest + + +def _run(coro): + return asyncio.run(coro) + + +# ── GAP-06: canonical normalization (pure, deterministic) ────────────── +@pytest.mark.parametrize("raw,expected", [ + ("ערר 8137/24", "8137-24"), + (" עע\"מ 1/20 ", "1-20"), + ("8126-03-25", "8126-03-25"), # month segment preserved + ("בל\"מ 1010-01-25", "1010-01-25"), + ("8047/23", "8047-23"), +]) +def test_canonical_case_number(raw, expected): + assert db._canonical_case_number(raw) == expected + + +def test_canonical_does_not_invent_month(): + # No month in input → none added (X1 §1). + assert db._canonical_case_number("8126/24") == "8126-24" + + +# ── GAP-13: completeness predicate (pure) ────────────────────────────── +def _complete_row(): + return { + "case_number": "8047-23", "case_name": "פלוני נ' הוועדה", + "practice_area": "rishuy_uvniya", "source_kind": "internal_committee", + "extraction_status": "completed", "headnote": "תקציר", + "summary": "", "subject_tags": [], + } + + +def test_compute_searchable_true_when_complete(): + assert db._compute_searchable(_complete_row(), has_embedded_chunk=True) is True + + +def test_compute_searchable_false_without_embedded_chunk(): + assert db._compute_searchable(_complete_row(), has_embedded_chunk=False) is False + + +def test_compute_searchable_false_without_metadata(): + row = _complete_row() + row["headnote"] = ""; row["summary"] = ""; row["subject_tags"] = [] + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +def test_compute_searchable_false_when_extraction_incomplete(): + row = _complete_row(); row["extraction_status"] = "pending" + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +def test_compute_searchable_false_without_core_fields(): + row = _complete_row(); row["practice_area"] = "" + assert db._compute_searchable(row, has_embedded_chunk=True) is False + + +def test_compute_searchable_external_allows_empty_practice_area(): + # External precedents (e.g. בג"ץ) are cross-domain — empty practice_area + # must NOT disqualify them, as long as the rest of the contract holds. + row = _complete_row() + row["source_kind"] = "external_upload" + row["practice_area"] = "" + assert db._compute_searchable(row, has_embedded_chunk=True) is True + + +# ── ingest wires in recompute_searchable (both types) ────────────────── +def test_ingest_calls_recompute_searchable(monkeypatch, tmp_path): + calls = {"recompute": [], "meta": [], "hal": []} + + async def _extract_text(path): return ("text", 1, [0]) + monkeypatch.setattr(ingest.extractor, "extract_text", _extract_text) + monkeypatch.setattr(ingest.extractor, "strip_nevo_preamble", lambda t: t) + monkeypatch.setattr(ingest.chunker, "chunk_document", + lambda t, page_offsets=None: [type("C", (), { + "chunk_index": 0, "content": "c", "section_type": "b", + "page_number": 1})()]) + + async def _embed(texts, input_type="document"): return [[0.0] * 8 for _ in texts] + monkeypatch.setattr(ingest.embeddings, "embed_texts", _embed) + + async def _store(cid, dicts): return len(dicts) + monkeypatch.setattr(ingest.db, "store_precedent_chunks", _store) + + async def _create_internal(**kw): return {"id": uuid4()} + monkeypatch.setattr(ingest.db, "create_internal_committee_decision", _create_internal) + + async def _noop(*a, **k): return None + monkeypatch.setattr(ingest.db, "set_case_law_extraction_status", _noop) + monkeypatch.setattr(ingest.db, "set_case_law_halacha_status", _noop) + monkeypatch.setattr(ingest.db, "request_metadata_extraction", + lambda cid: calls["meta"].append(cid) or _noop()) + monkeypatch.setattr(ingest.db, "request_halacha_extraction", + lambda cid: calls["hal"].append(cid) or _noop()) + + async def _recompute(cid): calls["recompute"].append(cid) + monkeypatch.setattr(ingest.db, "recompute_searchable", _recompute) + monkeypatch.setattr(ingest.config, "PARENT_DOC_RETRIEVAL_ENABLED", False) + monkeypatch.setattr(ingest.config, "MULTIMODAL_ENABLED", False) + + from legal_mcp.services import internal_decisions + _run(internal_decisions.ingest_internal_decision( + case_number="8047/23", text="t", chair_name="x", practice_area="rishuy_uvniya")) + assert len(calls["recompute"]) == 1, "ingest must recompute searchable after success" diff --git a/mcp-server/tests/test_unified_ingest.py b/mcp-server/tests/test_unified_ingest.py index 08bbd1e..0456e51 100644 --- a/mcp-server/tests/test_unified_ingest.py +++ b/mcp-server/tests/test_unified_ingest.py @@ -72,6 +72,9 @@ def patched(monkeypatch, tmp_path): async def _set_status(cid, status): return None + async def _recompute_searchable(cid=None): + return 0 + monkeypatch.setattr(extractor, "extract_text", _extract_text) monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip) monkeypatch.setattr(chunker, "chunk_document", _chunk) @@ -83,6 +86,7 @@ def patched(monkeypatch, tmp_path): monkeypatch.setattr(db, "request_halacha_extraction", _req_hal) monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status) monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status) + monkeypatch.setattr(db, "recompute_searchable", _recompute_searchable) # Force flat chunking + multimodal OFF unless a test flips it. monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False) monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)