fix(halacha): חילוץ-מחדש משמר הלכות מאושרות (INV-G10, #108) #172

Merged
chaim merged 1 commits from worktree-fix-halacha-reextract-dataloss into main 2026-06-10 09:08:47 +00:00
5 changed files with 173 additions and 11 deletions

View File

@@ -4157,17 +4157,44 @@ async def store_halachot(case_law_id: UUID, halachot: list[dict]) -> int:
return len(halachot)
async def reset_halacha_extraction(case_law_id: UUID) -> None:
"""Force a clean re-extraction: wipe halachot + clear per-chunk checkpoints
so every chunk is re-processed (used by explicit re-extract, not resume)."""
async def reset_halacha_extraction(case_law_id: UUID) -> dict:
"""Prepare a clean re-extraction WITHOUT destroying chair-approved work.
Deletes only un-reviewed halachot (``review_status NOT IN ('approved',
'published')``) and clears per-chunk checkpoints so every chunk is
re-processed. Chair-approved / published halachot are PRESERVED — INV-G10:
a human approval is never silently deleted by a re-extraction. The
re-extractor's dedup-on-insert (:func:`store_halachot_for_chunk`) skips any
freshly extracted halacha that duplicates a preserved one, so approvals
survive without producing duplicates.
History: this once wiped ALL halachot first, then re-extracted — a crash
between the wipe and the first chunk's store lost every approval and left
the row stuck ``status='processing'`` with 0 rows (the 2026-06-08 amiel
incident, TaskMaster #108). Durable resume of the whole pipeline is X16/#114.
Returns ``{"deleted": N, "preserved": M}``.
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("DELETE FROM halachot WHERE case_law_id = $1", case_law_id)
preserved = await conn.fetchval(
"SELECT COUNT(*) FROM halachot WHERE case_law_id = $1 "
"AND review_status IN ('approved', 'published')", case_law_id,
)
tag = await conn.execute(
"DELETE FROM halachot WHERE case_law_id = $1 "
"AND review_status NOT IN ('approved', 'published')", case_law_id,
)
await conn.execute(
"UPDATE precedent_chunks SET halacha_extracted_at = NULL "
"WHERE case_law_id = $1", case_law_id,
)
try:
deleted = int(str(tag).split()[-1])
except (ValueError, IndexError):
deleted = 0
return {"deleted": deleted, "preserved": int(preserved or 0)}
async def mark_all_chunks_extracted(case_law_id: UUID) -> int:

View File

@@ -6,8 +6,10 @@ structured list of halachot, validates each one against the source text,
embeds the rule statement, and stores everything as ``pending_review`` in
the ``halachot`` table.
All extraction is idempotent — calling ``extract(case_law_id)`` twice
deletes prior rows for that precedent first.
All extraction is idempotent — calling ``extract(case_law_id, force=True)``
twice drops the precedent's un-reviewed rows and re-extracts. Chair-approved /
published halachot are PRESERVED across a re-extract (INV-G10); see
``db.reset_halacha_extraction``.
Trust model:
Per chair decision, NO halacha is auto-published. Every extracted
@@ -530,8 +532,20 @@ async def _extract_impl(case_law_id: UUID, force: bool = False,
return {"status": "no_chunks", "extracted": 0, "stored": 0}
# force = clean slate; otherwise resume (skip already-checkpointed chunks).
# "Clean slate" preserves chair-approved/published halachot (INV-G10) — only
# un-reviewed rows are dropped; the per-chunk dedup-on-insert skips fresh
# extractions that duplicate a preserved approval, so approvals survive a
# re-extract without duplicating. See db.reset_halacha_extraction / #108.
preserved_approved = 0
if force:
await db.reset_halacha_extraction(case_law_id)
reset = await db.reset_halacha_extraction(case_law_id)
preserved_approved = reset.get("preserved", 0)
if preserved_approved:
logger.info(
"halacha_extractor: case_law=%s force re-extract — preserved %d "
"approved/published halachot (INV-G10), dropped %d un-reviewed.",
case_law_id, preserved_approved, reset.get("deleted", 0),
)
for c in chunks:
c["halacha_extracted_at"] = None
@@ -686,5 +700,6 @@ async def _extract_impl(case_law_id: UUID, force: bool = False,
"folded": folded,
"stored": stored,
"stored_this_run": stored_total,
"preserved_approved": preserved_approved,
"total_chunks": len(chunks),
}

View File

@@ -138,6 +138,10 @@ async def reextract_halachot(
) -> dict:
"""Re-run the halacha extractor on an existing precedent. Idempotent.
Chair-approved / published halachot are PRESERVED across the re-extract
(INV-G10) — only un-reviewed rows are replaced. See
``db.reset_halacha_extraction`` / TaskMaster #108.
**MCP-tool-only path.** This function calls into ``halacha_extractor``,
which calls ``claude_session`` — the local CLI is required. Invoking
this from the FastAPI container will raise ``Claude CLI not found``.
@@ -157,9 +161,10 @@ async def reextract_halachot(
# bad data. See note in db.request_metadata_extraction.
await progress("extracting_halachot", 50, "מחלץ הלכות מחדש")
# Explicit re-extraction = clean slate (force): wipe prior halachot +
# per-chunk checkpoints and redo all. (Queue draining / resume uses the
# default force=False so an interrupted run continues where it stopped.)
# Explicit re-extraction = clean slate (force): drop un-reviewed halachot +
# clear per-chunk checkpoints and redo all, but PRESERVE chair-approved /
# published rows (INV-G10; dedup-on-insert avoids duplicating them). (Queue
# draining / resume uses force=False so an interrupted run continues.)
result = await halacha_extractor.extract(case_law_id, force=True)
# Clear the queue timestamp on completion so the UI badge / worker queue
# don't keep showing this row. The queue worker (process_pending_extractions)

View File

@@ -183,7 +183,7 @@ async def precedent_library_delete(case_law_id: str) -> str:
async def precedent_extract_halachot(case_law_id: str) -> str:
"""הרצה מחדש של חילוץ ההלכות לפסיקה קיימת. הלכות קודמות נמחקות."""
"""הרצה מחדש של חילוץ ההלכות לפסיקה קיימת. הלכות שאושרו/פורסמו נשמרות (INV-G10); רק הלכות שלא-נבדקו מוחלפות."""
try:
cid = UUID(case_law_id)
except ValueError:

View File

@@ -0,0 +1,115 @@
"""Regression test for TaskMaster #108 / INV-G10 — re-extraction must NOT delete
chair-approved/published halachot.
Bug (2026-06-08 amiel incident, בל"מ 8126-03-25): ``reset_halacha_extraction``
ran an UNCONDITIONAL ``DELETE FROM halachot`` before re-extracting. A crash
between the delete and the first chunk's store lost every chair approval (9
approved + their rule_type) and left the row stuck ``status='processing'`` with
0 rows.
Fix: the delete now excludes ``review_status IN ('approved','published')`` so
approvals survive a re-extract; the per-chunk dedup-on-insert
(``store_halachot_for_chunk``) skips fresh extractions that duplicate a
preserved approval, so no duplicates appear either.
Runs fully OFFLINE — monkeypatches ``db.get_pool`` with a fake pool that
captures every SQL string instead of hitting Postgres (same style as
``test_precedent_corpus_isolation.py``). Asserts the DELETE carries the
approved/published exclusion and that the function reports preserved/deleted
counts.
"""
from __future__ import annotations
import asyncio
from uuid import uuid4
import pytest
from legal_mcp.services import db
class _FakeTxn:
async def __aenter__(self) -> "_FakeTxn":
return self
async def __aexit__(self, *exc) -> bool: # noqa: ANN002
return False
class _FakeConn:
def __init__(self) -> None:
self.executed: list[str] = []
self.fetchvals: list[str] = []
async def execute(self, sql: str, *args) -> str: # noqa: ANN002
self.executed.append(sql)
return "DELETE 3" # mimic asyncpg command tag so the count parse works
async def fetchval(self, sql: str, *args) -> int: # noqa: ANN002
self.fetchvals.append(sql)
return 9 # pretend 9 approved/published rows are present
def transaction(self) -> _FakeTxn:
return _FakeTxn()
class _AcquireCtx:
def __init__(self, conn: _FakeConn) -> None:
self._conn = conn
async def __aenter__(self) -> _FakeConn:
return self._conn
async def __aexit__(self, *exc) -> bool: # noqa: ANN002
return False
class _FakePool:
def __init__(self, conn: _FakeConn) -> None:
self._conn = conn
def acquire(self) -> _AcquireCtx:
return _AcquireCtx(self._conn)
@pytest.fixture()
def fake_conn(monkeypatch: pytest.MonkeyPatch) -> _FakeConn:
conn = _FakeConn()
pool = _FakePool(conn)
async def _get_pool() -> _FakePool:
return pool
monkeypatch.setattr(db, "get_pool", _get_pool)
return conn
def test_reset_halacha_extraction_preserves_approved(fake_conn: _FakeConn) -> None:
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(db.reset_halacha_extraction(uuid4()))
finally:
loop.close()
delete_sql = next(
q for q in fake_conn.executed if q.strip().upper().startswith("DELETE")
)
norm = " ".join(delete_sql.split())
# INV-G10: the delete MUST exclude chair-approved/published halachot.
assert "review_status NOT IN ('approved', 'published')" in norm, delete_sql
# ...and must therefore be conditional — never an unconditional wipe.
assert "WHERE case_law_id = $1 AND review_status NOT IN" in norm, delete_sql
# The preserved-count query filters to exactly approved/published.
assert any(
"IN ('approved', 'published')" in q and "NOT IN" not in q
for q in fake_conn.fetchvals
), fake_conn.fetchvals
# Checkpoints are still cleared so every chunk re-processes.
assert any("halacha_extracted_at = NULL" in q for q in fake_conn.executed)
# Reports counts for provenance (G9) / caller logging.
assert result == {"deleted": 3, "preserved": 9}