"""Regression test for TaskMaster #108 / INV-G10 — re-extraction must NOT delete chair-approved/published halachot. Bug (2026-06-08 amiel incident, בל"מ 8126-03-25): ``reset_halacha_extraction`` ran an UNCONDITIONAL ``DELETE FROM halachot`` before re-extracting. A crash between the delete and the first chunk's store lost every chair approval (9 approved + their rule_type) and left the row stuck ``status='processing'`` with 0 rows. Fix: the delete now excludes ``review_status IN ('approved','published')`` so approvals survive a re-extract; the per-chunk dedup-on-insert (``store_halachot_for_chunk``) skips fresh extractions that duplicate a preserved approval, so no duplicates appear either. Runs fully OFFLINE — monkeypatches ``db.get_pool`` with a fake pool that captures every SQL string instead of hitting Postgres (same style as ``test_precedent_corpus_isolation.py``). Asserts the DELETE carries the approved/published exclusion and that the function reports preserved/deleted counts. """ from __future__ import annotations import asyncio from uuid import uuid4 import pytest from legal_mcp.services import db class _FakeTxn: async def __aenter__(self) -> "_FakeTxn": return self async def __aexit__(self, *exc) -> bool: # noqa: ANN002 return False class _FakeConn: def __init__(self) -> None: self.executed: list[str] = [] self.fetchvals: list[str] = [] async def execute(self, sql: str, *args) -> str: # noqa: ANN002 self.executed.append(sql) return "DELETE 3" # mimic asyncpg command tag so the count parse works async def fetchval(self, sql: str, *args) -> int: # noqa: ANN002 self.fetchvals.append(sql) return 9 # pretend 9 approved/published rows are present def transaction(self) -> _FakeTxn: return _FakeTxn() class _AcquireCtx: def __init__(self, conn: _FakeConn) -> None: self._conn = conn async def __aenter__(self) -> _FakeConn: return self._conn async def __aexit__(self, *exc) -> bool: # noqa: ANN002 return False class _FakePool: def __init__(self, conn: _FakeConn) -> None: self._conn = conn def acquire(self) -> _AcquireCtx: return _AcquireCtx(self._conn) @pytest.fixture() def fake_conn(monkeypatch: pytest.MonkeyPatch) -> _FakeConn: conn = _FakeConn() pool = _FakePool(conn) async def _get_pool() -> _FakePool: return pool monkeypatch.setattr(db, "get_pool", _get_pool) return conn def test_reset_halacha_extraction_preserves_approved(fake_conn: _FakeConn) -> None: loop = asyncio.new_event_loop() try: result = loop.run_until_complete(db.reset_halacha_extraction(uuid4())) finally: loop.close() delete_sql = next( q for q in fake_conn.executed if q.strip().upper().startswith("DELETE") ) norm = " ".join(delete_sql.split()) # INV-G10: the delete MUST exclude chair-approved/published halachot. assert "review_status NOT IN ('approved', 'published')" in norm, delete_sql # ...and must therefore be conditional — never an unconditional wipe. assert "WHERE case_law_id = $1 AND review_status NOT IN" in norm, delete_sql # The preserved-count query filters to exactly approved/published. assert any( "IN ('approved', 'published')" in q and "NOT IN" not in q for q in fake_conn.fetchvals ), fake_conn.fetchvals # Checkpoints are still cleared so every chunk re-processes. assert any("halacha_extracted_at = NULL" in q for q in fake_conn.executed) # Reports counts for provenance (G9) / caller logging. assert result == {"deleted": 3, "preserved": 9}