18 KiB
FU-3: Re-Index on Content Change — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Detect content changes via a SHA-256 content_hash, expose a standalone reindex_case_law that re-embeds from stored full_text (no re-OCR, no file needed), and surface embedding-drift in the health-check — enforcing INV-G6 where embeddings can't be DB-GENERATED.
Architecture: Two additive case_law columns (V23): content_hash (hash of current full_text, written at the create boundary) and indexed_hash (hash the current chunks/embeddings were built from, set by mark_indexed after a successful store). Stale ⇔ content_hash IS DISTINCT FROM indexed_hash. reindex_case_law reuses the canonical _chunk_embed_store over stored text. Backfill only computes hashes (no re-embed — existing rows keep their vectors).
Tech Stack: Python 3.12, asyncpg, PostgreSQL@localhost:5433, voyage embeddings API, pytest offline, .venv at mcp-server/.venv.
Spec: docs/superpowers/specs/2026-05-30-fu3-reindex-on-change-design.md
Run tests: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_reindex_on_change.py -v
File Structure
- Modify
mcp-server/src/legal_mcp/services/db.py—_content_hash; V23 migration;content_hashincreate_external_case_law/create_internal_committee_decision/create_case;mark_indexed;list_stale_case_law;recompute_content_hashes. - Modify
mcp-server/src/legal_mcp/services/ingest.py—reindex_case_law; callmark_indexedafter_chunk_embed_storeiningest_document. - Modify
mcp-server/src/legal_mcp/services/metrics.py—stale_embedding_case_lawcount. - Modify
mcp-server/src/legal_mcp/tools/precedent_library.py+server.py— MCP toolprecedent_reindex. - Create
mcp-server/tests/test_reindex_on_change.py.
Task 1: Failing tests
Files: Create mcp-server/tests/test_reindex_on_change.py
- Step 1: Write the failing tests
"""FU-3: re-index on content change (offline, monkeypatched I/O)."""
from __future__ import annotations
import asyncio
from uuid import uuid4
import pytest
from legal_mcp.services import db, ingest
def _run(coro):
return asyncio.run(coro)
# ── content_hash is deterministic ──────────────────────────────────────
def test_content_hash_deterministic():
h1 = db._content_hash("פסק דין כלשהו")
h2 = db._content_hash("פסק דין כלשהו")
assert h1 == h2 and len(h1) == 64 # sha256 hex
def test_content_hash_empty_is_blank():
assert db._content_hash("") == ""
assert db._content_hash(None) == ""
def test_content_hash_changes_with_text():
assert db._content_hash("alpha") != db._content_hash("beta")
# ── mark_indexed copies content_hash → indexed_hash ─────────────────────
def test_mark_indexed_executes_update(monkeypatch):
seen = {}
class _Conn:
async def execute(self, q, *a):
seen["q"] = q; seen["args"] = a
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
class _Pool:
def acquire(self): return _Conn()
async def _pool(): return _Pool()
monkeypatch.setattr(db, "get_pool", _pool)
cid = uuid4()
_run(db.mark_indexed(cid))
assert "indexed_hash" in seen["q"] and "content_hash" in seen["q"]
assert seen["args"][0] == cid
# ── reindex_case_law re-embeds from stored text, no extractor/LLM ───────
def test_reindex_case_law_uses_stored_text(monkeypatch):
cid = uuid4()
calls = {"chunk_embed_store": [], "mark_indexed": []}
async def _get_case_law(x):
return {"id": cid, "full_text": "טקסט שמור של ההחלטה"}
monkeypatch.setattr(ingest.db, "get_case_law", _get_case_law)
async def _ces(case_law_id, text, page_offsets, page_count, progress):
calls["chunk_embed_store"].append((case_law_id, text))
return 5
monkeypatch.setattr(ingest, "_chunk_embed_store", _ces)
async def _mark(x):
calls["mark_indexed"].append(x)
monkeypatch.setattr(ingest.db, "mark_indexed", _mark)
out = _run(ingest.reindex_case_law(cid))
assert out["chunks"] == 5 and out["reindexed"] is True
assert calls["chunk_embed_store"][0][1] == "טקסט שמור של ההחלטה"
assert calls["mark_indexed"] == [cid]
def test_reindex_case_law_missing_row_raises(monkeypatch):
async def _none(x): return None
monkeypatch.setattr(ingest.db, "get_case_law", _none)
with pytest.raises(ValueError, match="not found"):
_run(ingest.reindex_case_law(uuid4()))
- Step 2: Run to verify failure
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_reindex_on_change.py -v
Expected: FAIL — AttributeError: ... no attribute '_content_hash' / mark_indexed / reindex_case_law.
- Step 3: Commit
cd ~/legal-ai
git add mcp-server/tests/test_reindex_on_change.py
git commit -m "test(reindex): failing tests for content-hash re-index (FU-3)"
Task 2: V23 + hash helpers + content_hash at write
Files: Modify mcp-server/src/legal_mcp/services/db.py
- Step 1: Ensure
hashlibimport + add_content_hash
READ the top imports of db.py. If import hashlib is absent, add it. Add this helper near _canonical_case_number (~line 1227):
def _content_hash(text: str) -> str:
"""SHA-256 hex of the text — deterministic content fingerprint (FU-3/GAP-09).
Empty/None → "" (a row with no text has no content fingerprint).
"""
if not text:
return ""
return hashlib.sha256(text.encode("utf-8")).hexdigest()
- Step 2: Add
SCHEMA_V23_SQLafterSCHEMA_V22_SQL+ wire it
READ near SCHEMA_V22_SQL and _run_schema_migrations. Add after the V22 block:
# ── V23: case_law content/indexed hashes — re-index on content change (GAP-09) ──
# content_hash = SHA-256 of current full_text (written at the create boundary).
# indexed_hash = the content_hash the CURRENT chunks/embeddings were built from
# (set by mark_indexed after a successful store). Stale ⇔ content_hash IS
# DISTINCT FROM indexed_hash. embedding can't be a GENERATED column (needs an
# API call), so freshness is enforced by detection + reindex_case_law + health-check.
SCHEMA_V23_SQL = """
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS content_hash text NOT NULL DEFAULT '';
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS indexed_hash text;
"""
After await conn.execute(SCHEMA_V22_SQL) add await conn.execute(SCHEMA_V23_SQL); bump the log line to v1-v23.
- Step 3: Write
content_hashin the two case_law create functions
In create_external_case_law and create_internal_committee_decision (db.py ~2610-2760), the INSERT ... ON CONFLICT ... DO UPDATE was built in FU-2a. For EACH:
- Add
content_hashto the INSERT column list (append after the last data column, before the closing)). - Add a matching
$Nplaceholder in VALUES (next number after the current max). - Add
content_hash = EXCLUDED.content_hashto theDO UPDATE SETclause. - Append
_content_hash(full_text)as the LAST positional arg in theconn.fetchrow(..., <args>)call (matching the new$N).
CRITICAL: the new placeholder number must equal (current highest $N) + 1, and the new arg must be appended LAST in the args tuple in the SAME order. Read the current SQL + args carefully and count. After editing, verify param count = placeholder count (Step 5 import check will catch a gross mismatch; the DB smoke in Task 6 confirms at runtime).
- Step 4: Write
content_hashincreate_case
In create_case (db.py ~1130-1165), the INSERT into cases — add content_hash? NO: cases is a different table (active appeal cases), and FU-3's scope is case_law (the corpus). Do NOT alter create_case or the cases table here. (The spec §3 mentioned create_case for normalization in FU-2a; for FU-3 hashing, scope is case_law only. Skip create_case.)
- Step 5: Add
mark_indexed,list_stale_case_law,recompute_content_hashes(afterget_case_law, ~line 2547)
async def mark_indexed(case_law_id: UUID) -> None:
"""Mark a case_law row's embeddings as built from its current content (FU-3).
Sets indexed_hash := content_hash. Call AFTER a successful chunk+embed+store.
"""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE case_law SET indexed_hash = content_hash WHERE id = $1",
case_law_id,
)
async def list_stale_case_law(limit: int = 500) -> list[dict]:
"""case_law rows whose embeddings are stale vs current content (GAP-09/INV-G6)."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT id, case_number, source_kind
FROM case_law
WHERE coalesce(full_text, '') <> ''
AND content_hash IS DISTINCT FROM indexed_hash
ORDER BY created_at LIMIT $1""",
limit,
)
return [dict(r) for r in rows]
async def recompute_content_hashes() -> dict:
"""Backfill (FU-3): set content_hash for all rows; set indexed_hash=content_hash
only where chunks already exist (those are already embedded). Rows with text but
no chunks get indexed_hash=NULL → surface as stale. Hash-only; no re-embed."""
pool = await get_pool()
updated = 0
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT id, full_text FROM case_law")
for r in rows:
ch = _content_hash(r["full_text"] or "")
has_chunks = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM precedent_chunks WHERE case_law_id = $1)",
r["id"])
await conn.execute(
"UPDATE case_law SET content_hash = $2, "
"indexed_hash = CASE WHEN $3 THEN $2 ELSE indexed_hash END WHERE id = $1",
r["id"], ch, bool(has_chunks))
updated += 1
return {"updated": updated}
- Step 6: Run the helper tests
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_reindex_on_change.py -k "content_hash or mark_indexed" -v
Expected: test_content_hash_* (3) + test_mark_indexed_executes_update PASS.
- Step 7: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(reindex): V23 content/indexed hashes + helpers + write content_hash (GAP-09, FU-3)"
Task 3: reindex_case_law + mark_indexed on ingest
Files: Modify mcp-server/src/legal_mcp/services/ingest.py
- Step 1: Call
mark_indexedafter successful chunk+embed+store iningest_document
READ ingest_document — find the line stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) (~line 184). Immediately AFTER it, add:
await db.mark_indexed(case_law_id)
(After a fresh ingest, chunks were just built from the current text → indexed_hash = content_hash.)
- Step 2: Add
reindex_case_law(append to ingest.py)
async def reindex_case_law(
case_law_id: "UUID | str",
progress: ProgressCb | None = None,
) -> dict:
"""Re-chunk + re-embed an existing case_law row from its STORED full_text (GAP-09).
No re-extract / no re-OCR (uses the stored text — see feedback_no_reocr_retrofit)
and no LLM/CLI (only chunker + voyage embeddings), so it is safe to run anywhere.
Idempotent: store_precedent_chunks(_hierarchical) is DELETE-then-INSERT.
"""
progress = progress or _noop_progress
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
row = await db.get_case_law(cid)
if not row:
raise ValueError(f"case_law not found: {cid}")
text = (row.get("full_text") or "").strip()
if not text:
raise ValueError("case_law has no stored full_text to re-index")
stored = await _chunk_embed_store(cid, text, None, 0, progress)
await db.mark_indexed(cid)
await progress("completed", 100, f"הוטמע מחדש: {stored} chunks")
return {"status": "completed", "case_law_id": str(cid), "chunks": stored, "reindexed": True}
(UUID, db, _chunk_embed_store, _noop_progress, ProgressCb are already in ingest.py.)
- Step 3: Run reindex tests
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_reindex_on_change.py -v
Expected: ALL pass (incl test_reindex_case_law_uses_stored_text, test_reindex_case_law_missing_row_raises).
- Step 4: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(reindex): reindex_case_law from stored text + mark_indexed on ingest (GAP-09, FU-3)"
Task 4: Health-check drift count
Files: Modify mcp-server/src/legal_mcp/services/metrics.py
- Step 1: Add
stale_embedding_case_lawcount
READ metrics.py — the aggregation that holds non_searchable_case_law / cases_with_stale_blocks (added in FU-2a/FU-7). Add a sibling, mirroring the exact pattern:
stale_embedding_case_law = await conn.fetchval(
"SELECT COUNT(*) FROM case_law "
"WHERE coalesce(full_text,'') <> '' AND content_hash IS DISTINCT FROM indexed_hash")
and expose it in the returned summary dict: "stale_embedding_case_law": stale_embedding_case_law.
- Step 2: Smoke-import + commit
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import metrics; print('clean')"
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/metrics.py
git commit -m "feat(reindex): health-check stale_embedding_case_law count (GAP-09, FU-3)"
Task 5: MCP tool precedent_reindex
Files: Modify mcp-server/src/legal_mcp/tools/precedent_library.py, mcp-server/src/legal_mcp/server.py
- Step 1: Add the tool function in precedent_library.py (mirror
precedent_extract_metadata)
READ precedent_extract_metadata (tools/precedent_library.py ~205-216) for the _ok/_err/UUID pattern. Add:
async def precedent_reindex(case_law_id: str) -> str:
"""re-chunk + re-embed פסיקה קיימת מה-full_text השמור (FU-3/GAP-09).
לתיקון drift של embeddings או אחרי שינוי-תוכן. אינו מריץ OCR/LLM — רק
chunking + voyage embeddings. idempotent (מוחק ובונה chunks מחדש).
"""
try:
cid = UUID(case_law_id)
except ValueError:
return _err("case_law_id לא תקין")
try:
from legal_mcp.services import ingest
result = await ingest.reindex_case_law(cid)
except Exception as e:
return _err(str(e))
return _ok(result)
- Step 2: Register in server.py (mirror the precedent tools'
@mcp.tool()registration)
READ server.py — find where precedent_extract_metadata (or another precedent_* tool) is registered with @mcp.tool() and delegated to tools.precedent_library. Add an equivalent registration for precedent_reindex following the identical pattern (decorator + delegation + the same import style). Report the exact registration block you added.
- Step 3: Smoke-import + commit
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import precedent_library; import legal_mcp.server; print('clean')"
cd ~/legal-ai
git add mcp-server/src/legal_mcp/tools/precedent_library.py mcp-server/src/legal_mcp/server.py
git commit -m "feat(reindex): precedent_reindex MCP tool (GAP-09, FU-3)"
Task 6: Backfill + full suite + DB smoke + lint + TaskMaster
- Step 1: Full offline suite
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q
Expected: all pass (FU-1/2a/7 + new FU-3). If a pre-existing test that calls ingest_document breaks because mark_indexed isn't stubbed, fix that fixture to stub db.mark_indexed (same pattern as the FU-2a recompute_searchable fixture fix). Report.
- Step 2: DB smoke + backfill (real Postgres — applies V23, runs backfill)
cd ~/legal-ai && set -a && source ~/.env 2>/dev/null && set +a
cd mcp-server && .venv/bin/python -c "
import asyncio
from legal_mcp.services import db
async def main():
await db.get_pool() # applies V23
pool = await db.get_pool()
async with pool.acquire() as c:
cols = await c.fetchval(\"SELECT count(*) FROM information_schema.columns WHERE table_name='case_law' AND column_name IN ('content_hash','indexed_hash')\")
print('V23 columns present:', cols, '(expect 2)')
res = await db.recompute_content_hashes()
print('backfill:', res)
stale = await db.list_stale_case_law()
print('stale after backfill:', len(stale))
asyncio.run(main())
" 2>&1 | grep -vE 'INFO|WARNING|httpx|deprecat|command not found|\^\^\^' | tail -5
Expected: V23 columns present: 2, backfill updated ~129, stale after backfill: a small number (rows with text but no chunks, e.g. cited_only). Report the stale count.
- Step 3: Lint
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/db.py src/legal_mcp/services/ingest.py 2>/dev/null; echo "exit=$?"
Expected: clean or "ruff not available".
- Step 4: TaskMaster — controller marks #61 + subtask 61.1 done (61.2 already cancelled), verifies via MCP.
Self-Review Notes
- GAP-09 → content_hash detection (Task 2) + reindex_case_law (Task 3) + drift health-check (Task 4) + MCP tool (Task 5).
- No re-OCR: reindex uses stored
full_textonly (Task 3) — honors feedback_no_reocr_retrofit. - Backfill is hash-only (Task 6 Step 2) — no re-embed, no API cost; existing vectors untouched.
- #61.2 closed (not-applicable, in the spec commit) — no multimodal backfill task here.
- Scope:
case_lawonly —create_case/casestable NOT touched (Task 2 Step 4). - Type consistency:
_content_hash(text)->str,mark_indexed(case_law_id),reindex_case_law(id)->{chunks,reindexed},list_stale_case_law(),recompute_content_hashes()->{updated}— names identical across tasks + tests. - Param-count risk (Task 2 Step 3): the FU-2a upsert SQL must get exactly one new placeholder + one new arg per function; verified at runtime by the Task 6 DB smoke (a mismatch raises immediately).