FU-2a: idempotent ingest + write-time normalization + searchable flag (GAP-03/06/13) #12

Merged
chaim merged 9 commits from fix/fu2a-idempotent-ingest into main 2026-05-30 21:06:33 +00:00
9 changed files with 1111 additions and 135 deletions

View File

@@ -2056,9 +2056,9 @@
},
{
"id": "60",
"title": "[FU-2] ingest idempotent + מזהים קנוניים",
"description": "מפתח-upsert דטרמיניסטי + נרמול case_number בכתיבה + תיאום מספרים מעורבים.",
"details": "מכסה GAP-03,06,07,08,13. מספק INV-ING2/G3/G1/ID1/ID2/DM2/DM1. severity: Critical. סוג: קוד + מיגרציית-נתונים (דורש אישור-עלות). תלוי ב-FU-1.",
"title": "[FU-2a] ingest idempotent + נרמול-בכתיבה + searchable (pure-code)",
"description": "upsert ON CONFLICT על מפתח קנוני + נרמול case_number בכתיבה (type-aware) + דגל searchable מפורש. אפס מיגרציית-נתונים.",
"details": "מכסה GAP-03,06,13. מספק INV-ING2/G3/G1/ID1/DM1. severity: Critical. סוג: pure-code (schema-additive). תלוי ב-FU-1 (#59). FU-2b (#67) מטפל ב-GAP-07/08 בנפרד.",
"testStrategy": "",
"status": "pending",
"dependencies": [
@@ -2086,26 +2086,6 @@
"testStrategy": "",
"parentId": "60"
},
{
"id": 3,
"title": "[GAP-07] תיאום מספרי-תיק מעורבים (with-month canonical)",
"description": "מיגרציה חד-פעמית; הצורה עם-חודש קנונית (החלטת-יו\"ר).",
"dependencies": [],
"details": "INV-ID1",
"status": "pending",
"testStrategy": "",
"parentId": "60"
},
{
"id": 4,
"title": "[GAP-08] הסרת ציטוט-מלא כ-case_number",
"description": "רשומות עם ציטוט מלא כמזהה (legacy).",
"dependencies": [],
"details": "INV-DM2/ID2",
"status": "pending",
"testStrategy": "",
"parentId": "60"
},
{
"id": 5,
"title": "[GAP-13] שדה searchable מפורש",
@@ -2358,6 +2338,40 @@
}
],
"updatedAt": "2026-05-30T17:37:34.741136+00:00"
},
{
"id": "67",
"title": "[FU-2b] תיאום מזהים קנוניים + ניקוי ציטוט-כמזהה (data-migration, chair)",
"description": "מיגרציה חד-פעמית של ~52+ רשומות case_law עם ציטוט-מלא ב-case_number → מספר-בסיס מנורמל; dedup (למשל 8047-23 כפול); הכרעת צורה קנונית per-record.",
"details": "מכסה GAP-07,08. מספק INV-ID1/ID2/DM2. severity: High. סוג: DATA-MIGRATION + chair-decision (מספר רשמי per-record, with-month canonical). דורש: גיבוי, dry-run, סקירת-יו\"ר, reversibility. תלוי ב-FU-2a (#60, לצורך פונקציית הנרמול). מקור: בדיקת DB 2026-05-30 — internal_committee ~52/56 ציטוט-מלא, ≥1 dup (8047-23), 1 בלתי-פתיר (ערר אדלר/cited_only).",
"testStrategy": "",
"status": "pending",
"dependencies": [
"60"
],
"priority": "high",
"subtasks": [
{
"id": 1,
"title": "[GAP-07] תיאום מספרי-תיק מעורבים (with-month canonical)",
"description": "מיגרציה חד-פעמית; הצורה עם-חודש קנונית (החלטת-יו\"ר).",
"dependencies": [],
"details": "INV-ID1",
"status": "pending",
"testStrategy": "",
"parentId": "67"
},
{
"id": 2,
"title": "[GAP-08] הסרת ציטוט-מלא כ-case_number",
"description": "רשומות עם ציטוט מלא כמזהה (legacy).",
"dependencies": [],
"details": "INV-DM2/ID2",
"status": "pending",
"testStrategy": "",
"parentId": "67"
}
]
}
],
"metadata": {

View File

@@ -0,0 +1,613 @@
# FU-2a: Idempotent Ingest + Write-Time Normalization + `searchable` Flag — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Make ingest idempotent (`ON CONFLICT` upsert), normalize identifiers at the write boundary (type-aware), and add a materialized `searchable` flag — all forward-only, no identifier migration.
**Architecture:** Pure-code + one schema-additive migration (V21) in `db.py`. The two `create_*_case_law` functions move from app-level SELECT-then-INSERT/UPDATE to atomic `INSERT … ON CONFLICT … DO UPDATE` against the existing V15 partial unique indexes (predicate repeated). A new `_canonical_case_number` normalizes at write for identifier-keyed corpora (internal/cases), not for external (citation is its id). A new `searchable` boolean is recomputed from the completeness contract on ingest/metadata completion; the search-layer filter is gated behind a dry-run.
**Tech Stack:** Python 3.12, asyncpg, PostgreSQL (pgvector) at localhost:5433, pytest offline, local `.venv` at `mcp-server/.venv`.
**Spec:** [docs/superpowers/specs/2026-05-30-fu2a-idempotent-ingest-design.md](../specs/2026-05-30-fu2a-idempotent-ingest-design.md)
**Run tests:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -v`
**DB smoke (real Postgres):** source `~/.env`, connect to `localhost:5433` db `legal_ai` (see Task 6).
---
## File Structure
- **Modify** `mcp-server/src/legal_mcp/services/db.py`:
- add `_canonical_case_number(s)` (pure) near `_normalize_case_number` (~line 1196).
- add pure `_compute_searchable(row, has_embedded_chunk)` + async `recompute_searchable(...)`.
- add `SCHEMA_V21_SQL` (after V20, ~line 1094) + wire into `_run_schema_migrations` (~line 1119).
- normalize at write in `create_case`, `create_internal_committee_decision` (NOT `create_external_case_law`).
- convert `create_external_case_law` + `create_internal_committee_decision` to `ON CONFLICT … DO UPDATE`.
- **Modify** `mcp-server/src/legal_mcp/services/ingest.py`: call `db.recompute_searchable(case_law_id)` after statuses are set (uniform, both types).
- **Modify** the search layer (`services/hybrid_search.py` and/or `db.py` search functions) — gated `searchable = true` filter (Task 6, only if dry-run is clean).
- **Create** `mcp-server/tests/test_idempotent_ingest.py` — offline tests for the pure pieces + ingest wiring.
**Unchanged:** public signatures of `ingest_precedent`/`ingest_internal_decision` (FU-1) and the DB-create parameter lists. Normalization/upsert live inside the write boundary.
---
## Task 1: Failing tests (pure logic + ingest wiring)
**Files:** Create `mcp-server/tests/test_idempotent_ingest.py`
- [ ] **Step 1: Write the failing tests**
```python
"""FU-2a: idempotent ingest + write-time normalization + searchable flag.
Offline tests for the *pure* pieces (canonical normalization, completeness
predicate) and ingest wiring. The real ON CONFLICT upsert is verified by a
DB smoke test against localhost:5433 (see plan Task 6), since it requires a
live Postgres partial unique index.
"""
from __future__ import annotations
import asyncio
from uuid import uuid4
import pytest
from legal_mcp.services import db, ingest
def _run(coro):
return asyncio.run(coro)
# ── GAP-06: canonical normalization (pure, deterministic) ──────────────
@pytest.mark.parametrize("raw,expected", [
("ערר 8137/24", "8137-24"),
(" עע\"מ 1/20 ", "1-20"),
("8126-03-25", "8126-03-25"), # month segment preserved
("בל\"מ 1010-01-25", "1010-01-25"),
("8047/23", "8047-23"),
])
def test_canonical_case_number(raw, expected):
assert db._canonical_case_number(raw) == expected
def test_canonical_does_not_invent_month():
# No month in input → none added (X1 §1).
assert db._canonical_case_number("8126/24") == "8126-24"
# ── GAP-13: completeness predicate (pure) ──────────────────────────────
def _complete_row():
return {
"case_number": "8047-23", "case_name": "פלוני נ' הוועדה",
"practice_area": "rishuy_uvniya", "source_kind": "internal_committee",
"extraction_status": "completed", "headnote": "תקציר",
"summary": "", "subject_tags": [],
}
def test_compute_searchable_true_when_complete():
assert db._compute_searchable(_complete_row(), has_embedded_chunk=True) is True
def test_compute_searchable_false_without_embedded_chunk():
assert db._compute_searchable(_complete_row(), has_embedded_chunk=False) is False
def test_compute_searchable_false_without_metadata():
row = _complete_row()
row["headnote"] = ""; row["summary"] = ""; row["subject_tags"] = []
assert db._compute_searchable(row, has_embedded_chunk=True) is False
def test_compute_searchable_false_when_extraction_incomplete():
row = _complete_row(); row["extraction_status"] = "pending"
assert db._compute_searchable(row, has_embedded_chunk=True) is False
def test_compute_searchable_false_without_core_fields():
row = _complete_row(); row["practice_area"] = ""
assert db._compute_searchable(row, has_embedded_chunk=True) is False
# ── ingest wires in recompute_searchable (both types) ──────────────────
def test_ingest_calls_recompute_searchable(monkeypatch, tmp_path):
calls = {"recompute": [], "meta": [], "hal": []}
async def _extract_text(path): return ("text", 1, [0])
monkeypatch.setattr(ingest.extractor, "extract_text", _extract_text)
monkeypatch.setattr(ingest.extractor, "strip_nevo_preamble", lambda t: t)
monkeypatch.setattr(ingest.chunker, "chunk_document",
lambda t, page_offsets=None: [type("C", (), {
"chunk_index": 0, "content": "c", "section_type": "b",
"page_number": 1})()])
async def _embed(texts, input_type="document"): return [[0.0] * 8 for _ in texts]
monkeypatch.setattr(ingest.embeddings, "embed_texts", _embed)
async def _store(cid, dicts): return len(dicts)
monkeypatch.setattr(ingest.db, "store_precedent_chunks", _store)
async def _create_internal(**kw): return {"id": uuid4()}
monkeypatch.setattr(ingest.db, "create_internal_committee_decision", _create_internal)
async def _noop(*a, **k): return None
monkeypatch.setattr(ingest.db, "set_case_law_extraction_status", _noop)
monkeypatch.setattr(ingest.db, "set_case_law_halacha_status", _noop)
monkeypatch.setattr(ingest.db, "request_metadata_extraction",
lambda cid: calls["meta"].append(cid) or _noop())
monkeypatch.setattr(ingest.db, "request_halacha_extraction",
lambda cid: calls["hal"].append(cid) or _noop())
async def _recompute(cid): calls["recompute"].append(cid)
monkeypatch.setattr(ingest.db, "recompute_searchable", _recompute)
monkeypatch.setattr(ingest.config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(ingest.config, "MULTIMODAL_ENABLED", False)
from legal_mcp.services import internal_decisions
_run(internal_decisions.ingest_internal_decision(
case_number="8047/23", text="t", chair_name="x", practice_area="rishuy_uvniya"))
assert len(calls["recompute"]) == 1, "ingest must recompute searchable after success"
```
- [ ] **Step 2: Run to verify failure**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -v`
Expected: FAIL — `AttributeError: module 'legal_mcp.services.db' has no attribute '_canonical_case_number'` (and `_compute_searchable`, `recompute_searchable`).
- [ ] **Step 3: Commit**
```bash
cd ~/legal-ai
git add mcp-server/tests/test_idempotent_ingest.py
git commit -m "test(ingest): failing tests for idempotent ingest + searchable (FU-2a)"
```
---
## Task 2: `_canonical_case_number` + write-time normalization
**Files:** Modify `mcp-server/src/legal_mcp/services/db.py`
- [ ] **Step 1: Add `_canonical_case_number` next to `_normalize_case_number` (~line 1212)**
```python
def _canonical_case_number(s: str) -> str:
"""Canonical write-time form per X1 §1: trim · prefix-strip · '/''-'.
Deterministic and format-only — does NOT add or remove a month segment.
Used at the write boundary for identifier-keyed corpora (internal
committee decisions, active cases). NOT for external precedents, whose
canonical identifier is the full citation.
"""
s = (s or "").strip()
m = re.search(r"\d", s)
if m:
s = s[m.start():]
return s.strip().replace("/", "-")
```
- [ ] **Step 2: Normalize at write in `create_case` (~line 1158)**
Change the INSERT's `case_number` binding to normalized form. Replace `case_id, case_number, title,` with:
```python
case_id, _canonical_case_number(case_number), title,
```
- [ ] **Step 3: Normalize at write in `create_internal_committee_decision` (top of function body, ~line 2649)**
Immediately after `pool = await get_pool()`, add:
```python
case_number = _canonical_case_number(case_number)
```
(Do NOT add this to `create_external_case_law` — external keeps its citation verbatim; that function only `.strip()`s, which the caller adapter already does.)
- [ ] **Step 4: Run normalization tests**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "canonical" -v`
Expected: `test_canonical_case_number` (5 cases) + `test_canonical_does_not_invent_month` PASS.
- [ ] **Step 5: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(ingest): write-time canonical case_number normalization (GAP-06, FU-2a)"
```
---
## Task 3: Convert both create functions to `ON CONFLICT DO UPDATE`
**Files:** Modify `mcp-server/src/legal_mcp/services/db.py`
- [ ] **Step 1: Replace `create_external_case_law` body (lines 2566-2624, from `pool = await get_pool()` to `return _row_to_case_law(row)`)**
```python
pool = await get_pool()
tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn:
# Atomic upsert on the V15 partial unique index
# uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'.
# The predicate is repeated in ON CONFLICT (required for partial indexes).
# This also subsumes the old cited_only→external_upload promotion: a
# cited_only row with the same case_number conflicts and is promoted by
# DO UPDATE. Scoped to the external partial index, so an internal row with
# the same number is NOT touched (the old SELECT-without-source_kind could
# wrongly promote it).
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, subject_tags,
summary, key_quote, full_text, source_url,
source_kind, document_id, extraction_status,
halacha_extraction_status, practice_area, appeal_subtype,
headnote, source_type, precedent_level, is_binding
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9,
'external_upload', $10, 'processing', 'pending',
$11, $12, $13, $14, $15, $16
)
ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
headnote = EXCLUDED.headnote,
key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote),
full_text = EXCLUDED.full_text,
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url),
source_type = EXCLUDED.source_type,
precedent_level = EXCLUDED.precedent_level,
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING *
""",
case_number, case_name, court, decision_date, tags_json,
summary, key_quote, full_text, source_url,
document_id, practice_area, appeal_subtype, headnote,
source_type, precedent_level, is_binding,
)
return _row_to_case_law(row)
```
- [ ] **Step 2: Replace `create_internal_committee_decision` body (lines 2649-2708)**
```python
pool = await get_pool()
case_number = _canonical_case_number(case_number)
tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn:
# Atomic upsert on V15 partial unique index
# uq_case_law_internal_number_proc (case_number, proceeding_type)
# WHERE source_kind = 'internal_committee'. Predicate repeated for the
# partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone).
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, chair_name, district,
subject_tags, summary, full_text,
source_kind, source_type, document_id,
extraction_status, halacha_extraction_status,
practice_area, appeal_subtype, is_binding, proceeding_type
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9,
'internal_committee', 'appeals_committee', $10,
'processing', 'pending',
$11, $12, $13, $14
)
ON CONFLICT (case_number, proceeding_type)
WHERE source_kind = 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name),
district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
full_text = EXCLUDED.full_text,
source_type = 'appeals_committee',
source_kind = 'internal_committee',
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING *
""",
case_number, case_name, court, decision_date, chair_name, district,
tags_json, summary, full_text,
document_id, practice_area, appeal_subtype, is_binding,
proceeding_type,
)
return _row_to_case_law(row)
```
- [ ] **Step 3: Verify import + no syntax error**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import db; print('db imports')"`
Expected: prints `db imports`.
- [ ] **Step 4: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(ingest): atomic ON CONFLICT upsert in create_*_case_law (GAP-03, FU-2a)"
```
---
## Task 4: V21 migration — `searchable` column + recompute
**Files:** Modify `mcp-server/src/legal_mcp/services/db.py`
- [ ] **Step 1: Add `SCHEMA_V21_SQL` after `SCHEMA_V20_SQL` (~line 1094)**
```python
# ── V21: explicit `searchable` flag (GAP-13 / INV-DM1) ─────────────
# Materialized completeness flag — a case_law row is exposed to search only
# when it satisfies the completeness contract (02-data-model §2a). Recomputed
# on ingest/metadata completion via recompute_searchable(); not inferred at
# query time. Default false so a freshly-inserted row is excluded until proven
# complete. Health-check surfaces count(*) FILTER (WHERE NOT searchable).
SCHEMA_V21_SQL = """
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS searchable boolean NOT NULL DEFAULT false;
CREATE INDEX IF NOT EXISTS idx_case_law_searchable ON case_law (searchable);
"""
```
- [ ] **Step 2: Wire V21 into `_run_schema_migrations` (~line 1119) and bump the log line**
After `await conn.execute(SCHEMA_V20_SQL)` add:
```python
await conn.execute(SCHEMA_V21_SQL)
```
Change the log line `"Database schema initialized (v1-v20)"``"Database schema initialized (v1-v21)"`.
- [ ] **Step 3: Add `_compute_searchable` (pure) + `recompute_searchable` (async) near the case_law helpers (after `create_internal_committee_decision`, ~line 2709)**
```python
def _compute_searchable(row: dict, has_embedded_chunk: bool) -> bool:
"""Completeness contract (INV-DM1 / 02-data-model §2a).
A row is searchable IFF: canonical id present · case_name/practice_area/
source_kind present · ≥1 chunk with a non-null embedding · extraction
completed · metadata non-empty (≥1 of headnote/summary/subject_tags).
Pure — `has_embedded_chunk` is supplied by the caller (cross-table check).
"""
if not has_embedded_chunk:
return False
if (row.get("extraction_status") or "") != "completed":
return False
if not (row.get("case_number") or "").strip():
return False
if not (row.get("case_name") or "").strip():
return False
if not (row.get("practice_area") or "").strip():
return False
if not (row.get("source_kind") or "").strip():
return False
tags = row.get("subject_tags") or []
has_meta = bool((row.get("headnote") or "").strip()) \
or bool((row.get("summary") or "").strip()) \
or (len(tags) > 0)
return has_meta
async def recompute_searchable(case_law_id: "UUID | str | None" = None) -> int:
"""Recompute and persist the `searchable` flag. Idempotent / reversible.
If case_law_id is None, recompute ALL rows (used by the V21 backfill and
the dry-run). Returns the number of rows now marked searchable=true.
"""
pool = await get_pool()
async with pool.acquire() as conn:
if case_law_id is not None:
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
rows = await conn.fetch(
"SELECT * FROM case_law WHERE id = $1", cid)
else:
rows = await conn.fetch("SELECT * FROM case_law")
n_true = 0
for r in rows:
row = dict(r)
# subject_tags is stored jsonb; _row_to_case_law parses it, but here
# we read raw — normalize to a list length check.
tags = row.get("subject_tags")
if isinstance(tags, str):
try:
tags = json.loads(tags)
except (ValueError, TypeError):
tags = []
row["subject_tags"] = tags or []
has_chunk = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM precedent_chunks "
"WHERE case_law_id = $1 AND embedding IS NOT NULL)", row["id"])
val = _compute_searchable(row, bool(has_chunk))
await conn.execute(
"UPDATE case_law SET searchable = $2 WHERE id = $1", row["id"], val)
if val:
n_true += 1
return n_true
```
- [ ] **Step 4: Run the completeness-predicate tests**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "searchable and not ingest" -v`
Expected: all `test_compute_searchable_*` PASS.
- [ ] **Step 5: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(data-model): V21 searchable flag + recompute_searchable (GAP-13, FU-2a)"
```
---
## Task 5: Wire `recompute_searchable` into ingest
**Files:** Modify `mcp-server/src/legal_mcp/services/ingest.py`
- [ ] **Step 1: Call recompute after statuses are set in `ingest_document`**
In `ingest.py`, find the block (added by FU-1) that sets statuses + queues extraction:
```python
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
```
Immediately AFTER `request_halacha_extraction`, add:
```python
await db.recompute_searchable(case_law_id)
```
> Rationale: at this point chunks+embeddings are stored and extraction_status is
> completed, so the completeness predicate is meaningful. Metadata may still be
> pending (queued), so the row may compute searchable=false until metadata fills —
> the metadata extractor also calls recompute (Task 5 Step 2).
- [ ] **Step 2: Call recompute after metadata extraction fills fields**
In `mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py`, find `extract_and_apply`'s success path (where it persists the filled metadata fields). After the DB update that writes the extracted metadata, add a call:
```python
await db.recompute_searchable(case_law_id)
```
(Import `db` is already present in that module; if not, add `from legal_mcp.services import db`. Confirm by reading the file's imports first.)
- [ ] **Step 3: Run the ingest-wiring test**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_idempotent_ingest.py -k "ingest_calls_recompute" -v`
Expected: `test_ingest_calls_recompute_searchable` PASS.
- [ ] **Step 4: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py
git commit -m "feat(ingest): recompute searchable on ingest + metadata completion (GAP-13, FU-2a)"
```
---
## Task 6: DB smoke + dry-run + GATED search filter
**Files:** Modify search layer ONLY if dry-run is clean (see Step 4).
- [ ] **Step 1: Apply the V21 migration to the local DB and smoke-test upsert idempotency**
Run (sources env, exercises real Postgres):
```bash
cd ~/legal-ai && set -a && source ~/.env && set +a
cd mcp-server && .venv/bin/python -c "
import asyncio, uuid
from legal_mcp.services import db
async def main():
await db.get_pool() # runs migrations incl V21
# idempotent internal upsert: same (case_number, proceeding_type) twice
cn = 'ZZ9999/24'
r1 = await db.create_internal_committee_decision(case_number=cn, case_name='t', full_text='x', practice_area='rishuy_uvniya')
r2 = await db.create_internal_committee_decision(case_number=cn, case_name='t2', full_text='x2', practice_area='rishuy_uvniya')
assert r1['id'] == r2['id'], 'upsert must update, not duplicate'
# cleanup
pool = await db.get_pool()
async with pool.acquire() as c:
await c.execute(\"DELETE FROM case_law WHERE case_number = 'ZZ9999-24'\")
print('UPSERT IDEMPOTENT OK; normalized stored as ZZ9999-24')
asyncio.run(main())
"
```
Expected: `UPSERT IDEMPOTENT OK` and no duplicate. (Note: `ZZ9999/24` normalizes to `ZZ9999-24` — confirms write-time normalization too.)
- [ ] **Step 2: Backfill the `searchable` flag (recompute, reversible)**
```bash
cd ~/legal-ai && set -a && source ~/.env && set +a
cd mcp-server && .venv/bin/python -c "
import asyncio
from legal_mcp.services import db
async def main():
n = await db.recompute_searchable()
print('recompute_searchable: rows now searchable =', n)
asyncio.run(main())
"
```
- [ ] **Step 3: Dry-run report — which rows would drop from search if the filter is enabled**
```bash
cd ~/legal-ai && set -a && source ~/.env && set +a
PGPASSWORD="$POSTGRES_PASSWORD" psql "host=$POSTGRES_HOST port=$POSTGRES_PORT dbname=$POSTGRES_DB user=$POSTGRES_USER" -c "
SELECT source_kind,
count(*) AS total,
count(*) FILTER (WHERE NOT searchable) AS would_drop
FROM case_law GROUP BY source_kind ORDER BY source_kind;"
```
Report the table to the controller. **Decision gate:** if `would_drop` includes legitimate, currently-findable precedents (e.g. external_upload / internal_committee rows that users rely on), DO NOT enable the search filter in Step 4 — stop and report; the filter waits for FU-2b. If `would_drop` is only genuinely-incomplete rows, proceed.
- [ ] **Step 4: (GATED) Enable `searchable = true` filter in the search layer**
ONLY if Step 3 is clean. Read `mcp-server/src/legal_mcp/services/hybrid_search.py` to find the `case_law` WHERE clauses in `search_precedent_library_hybrid` / `search_documents_hybrid`. Add `AND cl.searchable = true` (alias as used in that query) to the case_law-joined precedent search paths. Add a focused test asserting a non-searchable row is excluded (monkeypatch or DB smoke). If deferred, write a one-line note in the spec §7 that the filter is pending FU-2b and skip.
- [ ] **Step 5: Add health-check visibility**
Find the health-check endpoint/function (search `def health` / `processing_status` in `web/app.py` or `tools/`). Add a field `non_searchable_case_law = SELECT count(*) FROM case_law WHERE NOT searchable`. Keep it a single cheap COUNT.
- [ ] **Step 6: Commit**
```bash
cd ~/legal-ai
git add -A mcp-server/ web/
git commit -m "feat(retrieval): gated searchable filter + health-check visibility (GAP-13, FU-2a)"
```
---
## Task 7: Full suite + smoke + lint + TaskMaster
- [ ] **Step 1: Full test suite**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q`
Expected: all pass (the FU-1 77 + new FU-2a tests). Report the summary line.
- [ ] **Step 2: Smoke-import**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import db, ingest, precedent_library, internal_decisions; print('clean')"`
Expected: `clean`.
- [ ] **Step 3: Lint changed files (if ruff available)**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/db.py src/legal_mcp/services/ingest.py 2>/dev/null; echo "exit=$?"`
Expected: clean or "ruff not available".
- [ ] **Step 4: Mark TaskMaster #60 + subtasks done**
Controller handles this (edit `.taskmaster/tasks/tasks.json`, verify via MCP get_task). Subtasks 60.1 (GAP-03), 60.2 (GAP-06), 60.5 (GAP-13).
---
## Self-Review Notes
- **GAP-03** → Task 3 (ON CONFLICT both functions). **GAP-06** → Task 2 (`_canonical_case_number` + write-time, type-aware). **GAP-13** → Tasks 4-5 (column + recompute + wiring) and gated Task 6 (filter).
- **No identifier migration** — FU-2b (#67) owns GAP-07/08. The V21 backfill only sets a derived, reversible flag.
- **Gated search filter** (Task 6 Step 3-4): the behavior-visible change is contingent on a clean dry-run; otherwise deferred. Surface the dry-run table to the user.
- **Offline-test limitation:** ON CONFLICT needs real Postgres → verified by Task 6 Step 1 smoke; offline tests cover the pure logic (normalize, completeness) and ingest wiring.
- **Type-consistency:** `_canonical_case_number`, `_compute_searchable(row, has_embedded_chunk)`, `recompute_searchable(case_law_id=None)` — names used identically in tests (Task 1) and impl (Tasks 2,4).

View File

@@ -0,0 +1,140 @@
# FU-2a — Idempotent Ingest + Write-Time Normalization + `searchable` Flag — עיצוב
**סטטוס:** מאושר-לעיצוב · **תאריך:** 2026-05-30 · **ענף:** TBD
**מכסה:** GAP-03, GAP-06, GAP-13 · **מספק:** INV-ING2, INV-G3, INV-G1, INV-ID1, INV-DM1
**מקורות:** [01-ingest.md](../../spec/01-ingest.md), [02-data-model.md](../../spec/02-data-model.md), [X1-identifiers.md](../../spec/X1-identifiers.md), [gap-audit.md](../../spec/gap-audit.md)
**משימה:** TaskMaster #60 · **תלוי ב:** FU-1 (#59) · **סוג:** pure-code (schema-additive)
**מיגרציה:** אין מיגרציית-מזהים (GAP-07/08 פוצלו ל-#67 / FU-2b). דגל `searchable` נגזר ו-recompute-בלבד.
---
## 1. היקף ומה מחוץ להיקף
FU-2 פוצל (החלטת-יו"ר 2026-05-30) לאחר שבדיקת-DB גילתה נתונים מבולגנים מהצפוי:
~52/56 רשומות `internal_committee` מחזיקות **ציטוט מלא** ב-`case_number`, יש ≥1 כפילות
(`8047-23`), ו-GAP-07 ("with-month canonical") דורש את המספר הרשמי שהוקצה — ידע-יו"ר.
- **בהיקף (FU-2a, כאן):** GAP-03 (upsert idempotent), GAP-06 (נרמול-בכתיבה), GAP-13 (`searchable`).
הכל **pure-code / schema-additive**, משנה התנהגות *קדימה*, אפס מוטציה של מזהים קיימים.
- **מחוץ להיקף (FU-2b, #67):** GAP-07 (תיאום מזהים מעורבים), GAP-08 (ניקוי ציטוט-כמזהה) —
מיגרציית-נתונים שמערבת dedup, סקירת-יו"ר per-record, גיבוי, reversibility.
**אינטראקציה FU-2a↔FU-2b (מתועד):** נרמול-בכתיבה חל רק על כתיבות *חדשות*. רשומות-עבר עם
ציטוט-מלא לא משתנות עד FU-2b. קליטה-חוזרת של רשומת-עבר מבולגנת תיצור רשומה *נקייה* חדשה
(לא תתנגש על המחרוזת השונה) — FU-2b יאחד. זה עקבי עם forward-only של FU-1.
## 2. הכרעות אדריכליות (מאומתות ≥3 מקורות)
| החלטה | נימוק | מקורות |
|-------|--------|--------|
| `INSERT … ON CONFLICT DO UPDATE` במקום SELECT-then-INSERT/UPDATE | אטומי, נטול race תחת Read-Committed; ה-SELECT-then-write הוא read-modify-write קלאסי | PostgreSQL INSERT docs; QueryPlane; on-systems.tech |
| **לחזור על predicate של ה-partial-index ב-ON CONFLICT** | V15 משתמש ב-partial unique indexes; Postgres דורש את ה-predicate ב-conflict target | PostgreSQL INSERT docs (§ON CONFLICT); QueryPlane gotchas |
| נרמול case_number **בכתיבה**, type-aware | נרמול הוא אחריות-גבול-קלט; ערכים שווי-משמעות → פלט זהה. פסיקה-חיצונית: הציטוט *הוא* המזהה → לא לחתוך | DDD value-objects (Medium/dev.to); gojko.net |
| דגל `searchable` **materialized** ונגזר-מחדש, לא מוסק בכל query | reify את חוזה-השלמות; חייב להיות נראה ל-health-check (לא הסקה סמויה) | DevIQ MISU; functional-architecture.org; Stemmler |
## 3. הקבצים
- **Modify** `mcp-server/src/legal_mcp/services/db.py`:
- `create_external_case_law` — להמיר ל-`ON CONFLICT` (target: `(case_number) WHERE source_kind <> 'internal_committee'`); זה גם מטפל בקידום `cited_only``external_upload` (אותו partial-index). לא לחתוך את ה-citation (זהו המזהה).
- `create_internal_committee_decision` — להמיר ל-`ON CONFLICT (case_number, proceeding_type) WHERE source_kind = 'internal_committee'`; לנרמל `case_number` בכניסה.
- `create_case` — לנרמל `case_number` בכניסה (כתיבה).
- הוספת helper `_canonical_case_number(s)` (שם מפורש; עוטף את הטרנספורם הדטרמיניסטי trim·prefix-strip·/→- של X1). `_normalize_case_number` הקיים (read-time) נשאר כ-shim.
- מיגרציית-schema **V21**: `ALTER TABLE case_law ADD COLUMN searchable boolean NOT NULL DEFAULT false`.
- פונקציה `recompute_searchable(case_law_id|all)` — נגזרת מחוזה-השלמות; נקראת בסיום-קליטה ובסיום-חילוץ-metadata.
- **Modify** `mcp-server/src/legal_mcp/services/ingest.py` — בסיום הצלחת הקליטה, לקרוא `db.recompute_searchable(case_law_id)` (אחיד לכל סוג; אחרי setting statuses).
- **Test** `mcp-server/tests/test_idempotent_ingest.py` (חדש) — offline, monkeypatched.
**גבול:** אין שינוי לחתימות הציבוריות של `ingest_precedent`/`ingest_internal_decision` (FU-1).
הנרמול וה-upsert יושבים בשכבת-ה-DB (גבול-הכתיבה), שקופים לקוראים.
## 4. נרמול type-aware (GAP-06)
`_canonical_case_number(s)` — דטרמיניסטי, תואם X1 §1, **לא מוסיף/מסיר חודש**:
```
trim → strip prefix לפני הספרה הראשונה → להחליף '/' ב-'-'
```
| נקודת-כתיבה | מדיניות | נימוק |
|--------------|---------|--------|
| `create_internal_committee_decision` | `_canonical_case_number(case_number)` | המזהה הקנוני = מספר-בסיס מנורמל |
| `create_case` | `_canonical_case_number(case_number)` | תיק פעיל — אותו כלל |
| `create_external_case_law` | `.strip()` בלבד (ללא prefix-strip) | פסיקה חיצונית: ה-citation הוא המזהה הקנוני (X1 §1); חיתוך היה הורס אותו |
> נרמול מטפל ב-prefix+separator בלבד. קלט שהוא ציטוט-מלא (party names, נבו) **לא** מנוקה ל-bare
> ע"י הנרמול — זה GAP-08/FU-2b. FU-2a מבטיח שקלט נקי-יחסית נשמר בצורה קנונית.
## 5. Idempotent upsert (GAP-03)
שתי פונקציות-ה-create עוברות מ-SELECT-then-INSERT/UPDATE ל-`INSERT … ON CONFLICT … DO UPDATE`,
עם **חזרה על ה-predicate** של ה-partial-index (V15):
- **internal:** `ON CONFLICT (case_number, proceeding_type) WHERE source_kind = 'internal_committee' DO UPDATE SET …`
- **external:** `ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' DO UPDATE SET …`
— מחליף את לוגיקת ה-SELECT הקיימת, **כולל** קידום `cited_only``external_upload` (אותה partial-
index חלה על שניהם; ה-DO UPDATE מקדם את source_kind וממלא שדות חסרים).
**`DO UPDATE` ממוקד:** רק שדות-קלט לא-ריקים דורסים (לשמר ערכים קיימים; `COALESCE(EXCLUDED.x, case_law.x)`),
ולא לדרוס מטא-דאטה שמולא ע"י חילוץ-LLM. אם ל-`case_law` יש טריגרי-`updated_at` — לסנן עם `WHERE`
על שינוי בפועל (gotcha מהמחקר). re-embed בקליטה-חוזרת = INV-ING4, שייך ל-FU-3 — כאן רק upsert-הרשומה.
## 6. דגל `searchable` (GAP-13)
עמודה חדשה `case_law.searchable boolean NOT NULL DEFAULT false`. **נגזרת** מחוזה-השלמות
(02-data-model §2a / INV-DM1), לא מוסקת ב-query:
```
searchable = (
case_number/citation קנוני לא-ריק
AND case_name<>'' AND practice_area<>'' AND source_kind<>''
AND EXISTS(precedent_chunk עם embedding NOT NULL)
AND extraction_status='completed'
AND (headnote<>'' OR summary<>'' OR jsonb_array_length(subject_tags)>0)
)
```
- `recompute_searchable(case_law_id)` נקראת בסיום-קליטה (ingest.py) ובסיום `precedent_metadata_extractor`.
- **Backfill (recompute-בלבד, הפיך):** מיגרציה V21 מריצה `recompute_searchable(all)` פעם אחת על רשומות
קיימות. זו גזירה הפיכה (ניתן להריץ שוב כל רגע) — אינה נוגעת במזהים, לא חלק מ-FU-2b.
- שכבת-החיפוש (`search_*`) תסונן ל-`searchable=true`**שינוי-התנהגות מתועד** (ראה §7).
- health-check יחשוף `count(*) FILTER (WHERE NOT searchable)` (זרע ל-GAP-14/FU-5).
## 7. שינויי-התנהגות וסיכון
| שינוי | השפעה | סיכון |
|--------|--------|--------|
| upsert ON CONFLICT | קליטה-חוזרת = update אטומי, לא כפילות; קידום cited_only נשמר | נמוך — מאומת מול partial-index הקיים |
| נרמול-בכתיבה (internal/cases) | קלט חדש נשמר כ-bare מנורמל | נמוך — type-aware; external לא נחתך |
| `searchable` מסנן חיפוש | רשומות שלא עומדות בחוזה-השלמות **לא יוחזרו** | ⚠️ בינוני — backfill עלול לסמן רשומות-עבר כ-non-searchable. **אימות:** להריץ recompute ב-dry-run ולדווח כמה ירדו מהחיפוש *לפני* הפעלת הסינון |
| backfill searchable | דגל נגזר על רשומות קיימות | נמוך — הפיך, recompute-בלבד, לא נוגע במזהים |
**אזהרת-backlog:** ה-rows עם ציטוט-מלא-כמזהה (FU-2b) עשויים בכל זאת לעמוד בחוזה-השלמות (יש להם
chunks+metadata), כך שהסינון לא בהכרח מפיל אותם. ה-dry-run ב-§7 יכמת זאת לפני הפעלה.
## 8. אסטרטגיית בדיקה
`tests/test_idempotent_ingest.py` — offline, monkeypatch ל-DB pool (או בדיקת-SQL מול sqlite-fallback אם קיים בפרויקט; אחרת monkeypatch כמו FU-1). מקרים:
1. `_canonical_case_number`: `"ערר 8137/24"``"8137-24"`, `"8126-03-25"``"8126-03-25"` (חודש נשמר), `" עע\"מ 1/20 "``"1-20"`.
2. נרמול type-aware: internal מנרמל; external **לא** חותך citation.
3. upsert: קליטה כפולה של אותו (case_number, proceeding_type) internal = רשומה אחת (לא שתיים).
4. upsert: קידום `cited_only``external_upload` על אותו case_number = עדכון, לא כפילות.
5. `DO UPDATE` ממוקד: מטא-דאטה קיים לא נדרס ע"י קלט ריק (COALESCE).
6. `recompute_searchable`: רשומה מלאה→true; חסרת-embedding/metadata/extraction→false.
7. ingest קורא recompute_searchable בסיום (שני הסוגים).
> בדיקת ON CONFLICT האמיתית דורשת Postgres. אם אין מסלול-בדיקה מול DB אמיתי בפרויקט,
> הבדיקות יאמתו את בניית-ה-SQL ואת הלוגיקה הטהורה (normalize, completeness predicate) ב-offline,
> ושכבת-ה-SQL תיבדק ב-smoke מול ה-DB המקומי (5433) ידנית בסיום, מתועד בתוכנית.
## 9. סדר-ביצוע
1. בדיקות אדומות (`test_idempotent_ingest.py`).
2. `_canonical_case_number` + נרמול-בכתיבה ב-3 פונקציות ה-create.
3. המרת שתי create ל-`ON CONFLICT … DO UPDATE` (עם predicate חוזר + COALESCE ממוקד).
4. מיגרציה V21: עמודה `searchable` + `recompute_searchable` + backfill recompute.
5. קריאה ל-`recompute_searchable` מ-ingest.py; חשיפת `count FILTER (WHERE NOT searchable)` ב-health-check.
6. **dry-run** של backfill מול DB 5433 → לדווח כמה רשומות יסומנו `searchable=false` ומאילו source_kind.
7. **שער החלטה (gated):** סינון `searchable=true` בשכבת-החיפוש מופעל **רק אם** ה-dry-run מראה
שאף רשומה לגיטימית לא יורדת מהחיפוש. אם רשומות-עבר לגיטימיות היו נופלות (למשל מבולגנות-FU-2b
שעדיין שמישות) — לדחות את הפעלת-הסינון לפולואו-אפ אחרי FU-2b, ולהשאיר את העמודה+health-check בלבד.
(להציף את ממצא ה-dry-run למשתמש לפני הפעלה — שינוי חיפוש הוא פעולה גלויה.)
8. בדיקות ירוקות + smoke מול DB מקומי + lint.

View File

@@ -1094,6 +1094,18 @@ CREATE INDEX IF NOT EXISTS idx_case_law_meta_tsv
"""
# ── V21: explicit `searchable` flag (GAP-13 / INV-DM1) ─────────────
# Materialized completeness flag — a case_law row is exposed to search only
# when it satisfies the completeness contract (02-data-model §2a). Recomputed
# on ingest/metadata completion via recompute_searchable(); not inferred at
# query time. Default false so a freshly-inserted row is excluded until proven
# complete. Health-check surfaces count(*) FILTER (WHERE NOT searchable).
SCHEMA_V21_SQL = """
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS searchable boolean NOT NULL DEFAULT false;
CREATE INDEX IF NOT EXISTS idx_case_law_searchable ON case_law (searchable);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
@@ -1117,7 +1129,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V18_SQL)
await conn.execute(SCHEMA_V19_SQL)
await conn.execute(SCHEMA_V20_SQL)
logger.info("Database schema initialized (v1-v20)")
await conn.execute(SCHEMA_V21_SQL)
logger.info("Database schema initialized (v1-v21)")
async def init_schema() -> None:
@@ -1155,7 +1168,7 @@ async def create_case(
hearing_date, notes, expected_outcome,
practice_area, appeal_subtype, proceeding_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)""",
case_id, case_number, title,
case_id, _canonical_case_number(case_number), title,
json.dumps(appellants or []),
json.dumps(respondents or []),
subject, property_address, permit_number, committee_type,
@@ -1211,6 +1224,21 @@ def _normalize_case_number(s: str) -> str:
return s.strip().replace("/", "-")
def _canonical_case_number(s: str) -> str:
"""Canonical write-time form per X1 §1: trim · prefix-strip · '/''-'.
Deterministic and format-only — does NOT add or remove a month segment.
Used at the write boundary for identifier-keyed corpora (internal
committee decisions, active cases). NOT for external precedents, whose
canonical identifier is the full citation.
"""
s = (s or "").strip()
m = re.search(r"\d", s)
if m:
s = s[m.start():]
return s.strip().replace("/", "-")
async def get_case_by_number(case_number: str) -> dict | None:
pool = await get_pool()
norm = _normalize_case_number(case_number)
@@ -2566,61 +2594,54 @@ async def create_external_case_law(
pool = await get_pool()
tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn:
existing = await conn.fetchrow(
"SELECT id, source_kind FROM case_law WHERE case_number = $1",
case_number,
)
if existing:
row = await conn.fetchrow(
"""
UPDATE case_law SET
case_name = $2,
court = COALESCE(NULLIF($3, ''), court),
date = COALESCE($4, date),
practice_area = $5,
appeal_subtype = $6,
subject_tags = $7,
summary = COALESCE(NULLIF($8, ''), summary),
headnote = $9,
key_quote = COALESCE(NULLIF($10, ''), key_quote),
full_text = $11,
source_url = COALESCE(NULLIF($12, ''), source_url),
source_type = $13,
precedent_level = $14,
is_binding = $15,
document_id = COALESCE($16, document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
WHERE id = $1
RETURNING *
""",
existing["id"], case_name, court, decision_date,
practice_area, appeal_subtype, tags_json, summary, headnote,
key_quote, full_text, source_url, source_type,
precedent_level, is_binding, document_id,
)
else:
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, subject_tags,
summary, key_quote, full_text, source_url,
source_kind, document_id, extraction_status,
halacha_extraction_status, practice_area, appeal_subtype,
headnote, source_type, precedent_level, is_binding
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9,
'external_upload', $10, 'processing', 'pending',
$11, $12, $13, $14, $15, $16
)
RETURNING *
""",
case_number, case_name, court, decision_date, tags_json,
# Atomic upsert on the V15 partial unique index
# uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'.
# The predicate is repeated in ON CONFLICT (required for partial indexes).
# This also subsumes the old cited_only→external_upload promotion: a
# cited_only row with the same case_number conflicts and is promoted by
# DO UPDATE. Scoped to the external partial index, so an internal row with
# the same number is NOT touched (the old SELECT-without-source_kind could
# wrongly promote it).
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, subject_tags,
summary, key_quote, full_text, source_url,
document_id, practice_area, appeal_subtype, headnote,
source_type, precedent_level, is_binding,
source_kind, document_id, extraction_status,
halacha_extraction_status, practice_area, appeal_subtype,
headnote, source_type, precedent_level, is_binding
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9,
'external_upload', $10, 'processing', 'pending',
$11, $12, $13, $14, $15, $16
)
ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
headnote = EXCLUDED.headnote,
key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote),
full_text = EXCLUDED.full_text,
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url),
source_type = EXCLUDED.source_type,
precedent_level = EXCLUDED.precedent_level,
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING *
""",
case_number, case_name, court, decision_date, tags_json,
summary, key_quote, full_text, source_url,
document_id, practice_area, appeal_subtype, headnote,
source_type, precedent_level, is_binding,
)
return _row_to_case_law(row)
@@ -2647,67 +2668,123 @@ async def create_internal_committee_decision(
filed against an existing appeal with the same number).
"""
pool = await get_pool()
case_number = _canonical_case_number(case_number)
tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn:
existing = await conn.fetchrow(
"SELECT id FROM case_law "
"WHERE case_number = $1 AND proceeding_type = $2 "
" AND source_kind = 'internal_committee'",
case_number, proceeding_type,
# Atomic upsert on V15 partial unique index
# uq_case_law_internal_number_proc (case_number, proceeding_type)
# WHERE source_kind = 'internal_committee'. Predicate repeated for the
# partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone).
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, chair_name, district,
subject_tags, summary, full_text,
source_kind, source_type, document_id,
extraction_status, halacha_extraction_status,
practice_area, appeal_subtype, is_binding, proceeding_type
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9,
'internal_committee', 'appeals_committee', $10,
'processing', 'pending',
$11, $12, $13, $14
)
ON CONFLICT (case_number, proceeding_type)
WHERE source_kind = 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name),
district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
full_text = EXCLUDED.full_text,
source_type = 'appeals_committee',
source_kind = 'internal_committee',
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING *
""",
case_number, case_name, court, decision_date, chair_name, district,
tags_json, summary, full_text,
document_id, practice_area, appeal_subtype, is_binding,
proceeding_type,
)
if existing:
row = await conn.fetchrow(
"""
UPDATE case_law SET
case_name = $2,
court = COALESCE(NULLIF($3, ''), court),
date = COALESCE($4, date),
chair_name = COALESCE(NULLIF($5, ''), chair_name),
district = COALESCE(NULLIF($6, ''), district),
practice_area = $7,
appeal_subtype = $8,
subject_tags = $9,
summary = COALESCE(NULLIF($10, ''), summary),
full_text = $11,
source_type = 'appeals_committee',
source_kind = 'internal_committee',
is_binding = $12,
document_id = COALESCE($13, document_id),
extraction_status = 'processing',
halacha_extraction_status = 'pending'
WHERE id = $1
RETURNING *
""",
existing["id"], case_name, court, decision_date,
chair_name, district, practice_area, appeal_subtype,
tags_json, summary, full_text, is_binding, document_id,
)
else:
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, chair_name, district,
subject_tags, summary, full_text,
source_kind, source_type, document_id,
extraction_status, halacha_extraction_status,
practice_area, appeal_subtype, is_binding, proceeding_type
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9,
'internal_committee', 'appeals_committee', $10,
'processing', 'pending',
$11, $12, $13, $14
)
RETURNING *
""",
case_number, case_name, court, decision_date, chair_name, district,
tags_json, summary, full_text,
document_id, practice_area, appeal_subtype, is_binding,
proceeding_type,
)
return _row_to_case_law(row)
def _compute_searchable(row: dict, has_embedded_chunk: bool) -> bool:
"""Completeness contract (INV-DM1 / 02-data-model §2a).
A row is searchable IFF: canonical id present · case_name/practice_area/
source_kind present · ≥1 chunk with a non-null embedding · extraction
completed · metadata non-empty (≥1 of headnote/summary/subject_tags).
Pure — `has_embedded_chunk` is supplied by the caller (cross-table check).
"""
if not has_embedded_chunk:
return False
if (row.get("extraction_status") or "") != "completed":
return False
if not (row.get("case_number") or "").strip():
return False
if not (row.get("case_name") or "").strip():
return False
# practice_area is required only for identifier-keyed corpora (internal
# committee decisions, active cases). External precedents (e.g. בג"ץ) are
# legitimately cross-domain and may have no single practice_area.
if (row.get("source_kind") or "") != "external_upload":
if not (row.get("practice_area") or "").strip():
return False
if not (row.get("source_kind") or "").strip():
return False
tags = row.get("subject_tags") or []
has_meta = bool((row.get("headnote") or "").strip()) \
or bool((row.get("summary") or "").strip()) \
or (len(tags) > 0)
return has_meta
async def recompute_searchable(case_law_id: "UUID | str | None" = None) -> int:
"""Recompute and persist the `searchable` flag. Idempotent / reversible.
If case_law_id is None, recompute ALL rows (used by the V21 backfill and
the dry-run). Returns the number of rows now marked searchable=true.
"""
pool = await get_pool()
async with pool.acquire() as conn:
if case_law_id is not None:
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
rows = await conn.fetch(
"SELECT * FROM case_law WHERE id = $1", cid)
else:
rows = await conn.fetch("SELECT * FROM case_law")
n_true = 0
for r in rows:
row = dict(r)
tags = row.get("subject_tags")
if isinstance(tags, str):
try:
tags = json.loads(tags)
except (ValueError, TypeError):
tags = []
row["subject_tags"] = tags or []
has_chunk = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM precedent_chunks "
"WHERE case_law_id = $1 AND embedding IS NOT NULL)", row["id"])
val = _compute_searchable(row, bool(has_chunk))
await conn.execute(
"UPDATE case_law SET searchable = $2 WHERE id = $1", row["id"], val)
if val:
n_true += 1
return n_true
async def update_case_law(case_law_id: UUID, **fields) -> dict | None:
"""Patch metadata fields on a case_law row.
@@ -3199,8 +3276,9 @@ async def search_precedent_library_semantic(
halacha_filters = [
"h.review_status IN ('approved', 'published')",
f"cl.source_kind = '{source_kind}'",
"cl.searchable = true",
]
chunk_filters = [f"cl.source_kind = '{source_kind}'"]
chunk_filters = [f"cl.source_kind = '{source_kind}'", "cl.searchable = true"]
h_params: list = [query_embedding, limit]
c_params: list = [query_embedding, limit]
h_idx = 3
@@ -3435,8 +3513,9 @@ async def search_precedent_library_lexical(
halacha_filters = [
"h.review_status IN ('approved', 'published')",
f"cl.source_kind = '{source_kind}'",
"cl.searchable = true",
]
chunk_filters = [f"cl.source_kind = '{source_kind}'"]
chunk_filters = [f"cl.source_kind = '{source_kind}'", "cl.searchable = true"]
# $1 = query, $2 = limit. Filters append starting at $3.
h_params: list = [query, limit]
c_params: list = [query, limit]

View File

@@ -197,6 +197,7 @@ async def ingest_document(
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await db.recompute_searchable(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")

View File

@@ -123,6 +123,9 @@ async def get_dashboard() -> dict:
total_corpus = await conn.fetchval("SELECT COUNT(*) FROM style_corpus")
total_patterns = await conn.fetchval("SELECT COUNT(*) FROM style_patterns")
total_case_law = await conn.fetchval("SELECT COUNT(*) FROM case_law")
non_searchable_case_law = await conn.fetchval(
"SELECT COUNT(*) FROM case_law WHERE NOT searchable"
)
# QA summary
qa_total = await conn.fetchval("SELECT COUNT(DISTINCT case_id) FROM qa_results")
@@ -154,6 +157,7 @@ async def get_dashboard() -> dict:
"style_corpus": total_corpus,
"style_patterns": total_patterns,
"case_law_entries": total_case_law,
"non_searchable_case_law": non_searchable_case_law,
},
"cases_by_status": cases_by_status,
"qa": {

View File

@@ -368,6 +368,8 @@ async def extract_and_apply(
if not suggested:
return {"status": "no_metadata", "fields": []}
result = await apply_to_record(case_law_id, suggested, overwrite_case_number=overwrite_case_number)
if result["updated"]:
await db.recompute_searchable(case_law_id)
return {
"status": "completed" if result["updated"] else "no_changes",
"fields": result["fields"],

View File

@@ -0,0 +1,119 @@
"""FU-2a: idempotent ingest + write-time normalization + searchable flag.
Offline tests for the *pure* pieces (canonical normalization, completeness
predicate) and ingest wiring. The real ON CONFLICT upsert is verified by a
DB smoke test against localhost:5433 (see plan Task 6), since it requires a
live Postgres partial unique index.
"""
from __future__ import annotations
import asyncio
from uuid import uuid4
import pytest
from legal_mcp.services import db, ingest
def _run(coro):
return asyncio.run(coro)
# ── GAP-06: canonical normalization (pure, deterministic) ──────────────
@pytest.mark.parametrize("raw,expected", [
("ערר 8137/24", "8137-24"),
(" עע\"מ 1/20 ", "1-20"),
("8126-03-25", "8126-03-25"), # month segment preserved
("בל\"מ 1010-01-25", "1010-01-25"),
("8047/23", "8047-23"),
])
def test_canonical_case_number(raw, expected):
assert db._canonical_case_number(raw) == expected
def test_canonical_does_not_invent_month():
# No month in input → none added (X1 §1).
assert db._canonical_case_number("8126/24") == "8126-24"
# ── GAP-13: completeness predicate (pure) ──────────────────────────────
def _complete_row():
return {
"case_number": "8047-23", "case_name": "פלוני נ' הוועדה",
"practice_area": "rishuy_uvniya", "source_kind": "internal_committee",
"extraction_status": "completed", "headnote": "תקציר",
"summary": "", "subject_tags": [],
}
def test_compute_searchable_true_when_complete():
assert db._compute_searchable(_complete_row(), has_embedded_chunk=True) is True
def test_compute_searchable_false_without_embedded_chunk():
assert db._compute_searchable(_complete_row(), has_embedded_chunk=False) is False
def test_compute_searchable_false_without_metadata():
row = _complete_row()
row["headnote"] = ""; row["summary"] = ""; row["subject_tags"] = []
assert db._compute_searchable(row, has_embedded_chunk=True) is False
def test_compute_searchable_false_when_extraction_incomplete():
row = _complete_row(); row["extraction_status"] = "pending"
assert db._compute_searchable(row, has_embedded_chunk=True) is False
def test_compute_searchable_false_without_core_fields():
row = _complete_row(); row["practice_area"] = ""
assert db._compute_searchable(row, has_embedded_chunk=True) is False
def test_compute_searchable_external_allows_empty_practice_area():
# External precedents (e.g. בג"ץ) are cross-domain — empty practice_area
# must NOT disqualify them, as long as the rest of the contract holds.
row = _complete_row()
row["source_kind"] = "external_upload"
row["practice_area"] = ""
assert db._compute_searchable(row, has_embedded_chunk=True) is True
# ── ingest wires in recompute_searchable (both types) ──────────────────
def test_ingest_calls_recompute_searchable(monkeypatch, tmp_path):
calls = {"recompute": [], "meta": [], "hal": []}
async def _extract_text(path): return ("text", 1, [0])
monkeypatch.setattr(ingest.extractor, "extract_text", _extract_text)
monkeypatch.setattr(ingest.extractor, "strip_nevo_preamble", lambda t: t)
monkeypatch.setattr(ingest.chunker, "chunk_document",
lambda t, page_offsets=None: [type("C", (), {
"chunk_index": 0, "content": "c", "section_type": "b",
"page_number": 1})()])
async def _embed(texts, input_type="document"): return [[0.0] * 8 for _ in texts]
monkeypatch.setattr(ingest.embeddings, "embed_texts", _embed)
async def _store(cid, dicts): return len(dicts)
monkeypatch.setattr(ingest.db, "store_precedent_chunks", _store)
async def _create_internal(**kw): return {"id": uuid4()}
monkeypatch.setattr(ingest.db, "create_internal_committee_decision", _create_internal)
async def _noop(*a, **k): return None
monkeypatch.setattr(ingest.db, "set_case_law_extraction_status", _noop)
monkeypatch.setattr(ingest.db, "set_case_law_halacha_status", _noop)
monkeypatch.setattr(ingest.db, "request_metadata_extraction",
lambda cid: calls["meta"].append(cid) or _noop())
monkeypatch.setattr(ingest.db, "request_halacha_extraction",
lambda cid: calls["hal"].append(cid) or _noop())
async def _recompute(cid): calls["recompute"].append(cid)
monkeypatch.setattr(ingest.db, "recompute_searchable", _recompute)
monkeypatch.setattr(ingest.config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(ingest.config, "MULTIMODAL_ENABLED", False)
from legal_mcp.services import internal_decisions
_run(internal_decisions.ingest_internal_decision(
case_number="8047/23", text="t", chair_name="x", practice_area="rishuy_uvniya"))
assert len(calls["recompute"]) == 1, "ingest must recompute searchable after success"

View File

@@ -72,6 +72,9 @@ def patched(monkeypatch, tmp_path):
async def _set_status(cid, status):
return None
async def _recompute_searchable(cid=None):
return 0
monkeypatch.setattr(extractor, "extract_text", _extract_text)
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
monkeypatch.setattr(chunker, "chunk_document", _chunk)
@@ -83,6 +86,7 @@ def patched(monkeypatch, tmp_path):
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
monkeypatch.setattr(db, "recompute_searchable", _recompute_searchable)
# Force flat chunking + multimodal OFF unless a test flips it.
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)