diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 0db67a0..ea79715 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import hashlib import json import logging import re @@ -1116,6 +1117,18 @@ ALTER TABLE cases ADD COLUMN IF NOT EXISTS blocks_stale boolean NOT NULL DEFAULT """ +# ── V23: case_law content/indexed hashes — re-index on content change (GAP-09) ── +# content_hash = SHA-256 of current full_text (written at the create boundary). +# indexed_hash = the content_hash the CURRENT chunks/embeddings were built from +# (set by mark_indexed after a successful store). Stale ⇔ content_hash IS +# DISTINCT FROM indexed_hash. embedding can't be a GENERATED column (needs an +# API call), so freshness is enforced by detection + reindex_case_law + health-check. +SCHEMA_V23_SQL = """ +ALTER TABLE case_law ADD COLUMN IF NOT EXISTS content_hash text NOT NULL DEFAULT ''; +ALTER TABLE case_law ADD COLUMN IF NOT EXISTS indexed_hash text; +""" + + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: await conn.execute(SCHEMA_SQL) @@ -1141,7 +1154,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V20_SQL) await conn.execute(SCHEMA_V21_SQL) await conn.execute(SCHEMA_V22_SQL) - logger.info("Database schema initialized (v1-v22)") + await conn.execute(SCHEMA_V23_SQL) + logger.info("Database schema initialized (v1-v23)") async def init_schema() -> None: @@ -1279,6 +1293,16 @@ def _canonical_case_number(s: str) -> str: return s.strip().replace("/", "-") +def _content_hash(text: str) -> str: + """SHA-256 hex of the text — deterministic content fingerprint (FU-3/GAP-09). + + Empty/None → "" (a row with no text has no content fingerprint). + """ + if not text: + return "" + return hashlib.sha256(text.encode("utf-8")).hexdigest() + + async def get_case_by_number(case_number: str) -> dict | None: pool = await get_pool() norm = _normalize_case_number(case_number) @@ -2546,6 +2570,55 @@ async def get_case_law(case_law_id: UUID) -> dict | None: return _row_to_case_law(row) if row else None +async def mark_indexed(case_law_id: UUID) -> None: + """Mark a case_law row's embeddings as built from its current content (FU-3). + + Sets indexed_hash := content_hash. Call AFTER a successful chunk+embed+store. + """ + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute( + "UPDATE case_law SET indexed_hash = content_hash WHERE id = $1", + case_law_id, + ) + + +async def list_stale_case_law(limit: int = 500) -> list[dict]: + """case_law rows whose embeddings are stale vs current content (GAP-09/INV-G6).""" + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT id, case_number, source_kind + FROM case_law + WHERE coalesce(full_text, '') <> '' + AND content_hash IS DISTINCT FROM indexed_hash + ORDER BY created_at LIMIT $1""", + limit, + ) + return [dict(r) for r in rows] + + +async def recompute_content_hashes() -> dict: + """Backfill (FU-3): set content_hash for all rows; set indexed_hash=content_hash + only where chunks already exist (those are already embedded). Rows with text but + no chunks get indexed_hash=NULL → surface as stale. Hash-only; no re-embed.""" + pool = await get_pool() + updated = 0 + async with pool.acquire() as conn: + rows = await conn.fetch("SELECT id, full_text FROM case_law") + for r in rows: + ch = _content_hash(r["full_text"] or "") + has_chunks = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM precedent_chunks WHERE case_law_id = $1)", + r["id"]) + await conn.execute( + "UPDATE case_law SET content_hash = $2, " + "indexed_hash = CASE WHEN $3 THEN $2 ELSE indexed_hash END WHERE id = $1", + r["id"], ch, bool(has_chunks)) + updated += 1 + return {"updated": updated} + + async def add_case_law_relation( a_id: UUID, b_id: UUID, relation_type: str = "same_case_chain" ) -> None: @@ -2649,11 +2722,11 @@ async def create_external_case_law( summary, key_quote, full_text, source_url, source_kind, document_id, extraction_status, halacha_extraction_status, practice_area, appeal_subtype, - headnote, source_type, precedent_level, is_binding + headnote, source_type, precedent_level, is_binding, content_hash ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, 'external_upload', $10, 'processing', 'pending', - $11, $12, $13, $14, $15, $16 + $11, $12, $13, $14, $15, $16, $17 ) ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' DO UPDATE SET @@ -2674,13 +2747,15 @@ async def create_external_case_law( document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), source_kind = 'external_upload', extraction_status = 'processing', - halacha_extraction_status = 'pending' + halacha_extraction_status = 'pending', + content_hash = EXCLUDED.content_hash RETURNING * """, case_number, case_name, court, decision_date, tags_json, summary, key_quote, full_text, source_url, document_id, practice_area, appeal_subtype, headnote, source_type, precedent_level, is_binding, + _content_hash(full_text), ) return _row_to_case_law(row) @@ -2722,13 +2797,13 @@ async def create_internal_committee_decision( subject_tags, summary, full_text, source_kind, source_type, document_id, extraction_status, halacha_extraction_status, - practice_area, appeal_subtype, is_binding, proceeding_type + practice_area, appeal_subtype, is_binding, proceeding_type, content_hash ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, 'internal_committee', 'appeals_committee', $10, 'processing', 'pending', - $11, $12, $13, $14 + $11, $12, $13, $14, $15 ) ON CONFLICT (case_number, proceeding_type) WHERE source_kind = 'internal_committee' @@ -2748,13 +2823,14 @@ async def create_internal_committee_decision( is_binding = EXCLUDED.is_binding, document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), extraction_status = 'processing', - halacha_extraction_status = 'pending' + halacha_extraction_status = 'pending', + content_hash = EXCLUDED.content_hash RETURNING * """, case_number, case_name, court, decision_date, chair_name, district, tags_json, summary, full_text, document_id, practice_area, appeal_subtype, is_binding, - proceeding_type, + proceeding_type, _content_hash(full_text), ) return _row_to_case_law(row)