From 23438922209dbcfad9c0728c71e36fc81cd58716 Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 15 Jun 2026 04:09:54 +0000 Subject: [PATCH] =?UTF-8?q?fix(extraction):=20reconcile=20=D7=9C=D7=AA?= =?UTF-8?q?=D7=99=D7=A7=D7=99=D7=9D-=D7=99=D7=AA=D7=95=D7=9E=D7=99=D7=9D?= =?UTF-8?q?=20=D7=91=D7=AA=D7=95=D7=A8=20=E2=80=94=20pending+requested=5Fa?= =?UTF-8?q?t=3DNULL=20(#139)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit תיק יכול להיות _extraction_status='pending' עם _requested_at=NULL — מעולם-לא-נכנס-לתור (מסלולי bulk/מיגרציה, או status שנכתב לפני החותם), והדריינר (סורק requested_at IS NOT NULL) עיוור אליו לנצח → ה-backlog מתנקז בשקט לאפס. requeue_stale_processing_extractions מרפא רק 'processing'. נצפה 2026-06-14: 96 תיקים pending אך 0 בתור (תוקנו ידנית). תיקון (G1 — שחזור invariant במקור, G2 — predicate יחיד): - db.reconcile_orphaned_pending_extractions(kind=) — kind-agnostic, מחזיר את invariant "שורה ברת-חילוץ ⇒ בתור": חותם requested_at ל-rows שהם pending + requested_at IS NULL + EXTRACTION_ELIGIBLE_PREDICATE (אותו מסנן של #140 — cited_only/chunkless לעולם לא נדחפים). אידמפוטנטי (rows מסומנים לא נתפסים). - precedent_library.process_pending_extractions קורא reconcile אחרי requeue_stale ולפני list — תיקים-משוחזרים נקלטים באותו pass. מנגנון-ריפוי יחיד (G2), לא מסלול מקביל; requeue_stale='processing', reconcile='pending'. - request_halacha_extraction מציב status='pending' עם החותם (סימטרי ל-metadata) — סוגר את חלון-ה-drift שמייצר pending+NULL מלכתחילה. מצב חי נקי (0 יתומים-כשירים אחרי התיקון-הידני); זהו תיקון מונע — הדריינר יְרַפֵּא יתומים עתידיים אוטומטית. בדיקות: test_extraction_orphan_reconcile (predicate משותף, pending+NULL בלבד, מובחן מ-requeue_stale, request_halacha סימטרי), שני ה-kinds. כל 349 עוברות. Invariants: G1, G2 (predicate משותף עם #140, ריפוי יחיד), INV-G3/INV-DUR1 (X16), INV-G4 (אין בליעה שקטה — reconcile מתעד), G12. Co-Authored-By: Claude Opus 4.8 (1M context) --- mcp-server/src/legal_mcp/services/db.py | 48 ++++++++++- .../legal_mcp/services/precedent_library.py | 11 +++ .../tests/test_extraction_orphan_reconcile.py | 85 +++++++++++++++++++ 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 mcp-server/tests/test_extraction_orphan_reconcile.py diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 77c75e5..aec33ef 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -6736,10 +6736,17 @@ async def request_metadata_extraction(case_law_id: UUID) -> bool: async def request_halacha_extraction(case_law_id: UUID) -> bool: """Same but for halacha extraction. See note on - :func:`request_metadata_extraction` re: opening to all source kinds.""" + :func:`request_metadata_extraction` re: opening to all source kinds. + + Sets ``halacha_extraction_status='pending'`` alongside the timestamp — + symmetric to :func:`request_metadata_extraction` — so status and queue-stamp + are written together and a re-request never leaves a stale terminal badge. + This also closes the drift window where a row could end up 'pending' with a + NULL requested_at (orphaned from the queue) (#139).""" pool = await get_pool() result = await pool.execute( - "UPDATE case_law SET halacha_extraction_requested_at = now() " + "UPDATE case_law SET halacha_extraction_requested_at = now(), " + "halacha_extraction_status = 'pending' " "WHERE id = $1", case_law_id, ) @@ -6828,6 +6835,43 @@ async def requeue_stale_processing_extractions(kind: str = "halacha") -> int: return 0 +async def reconcile_orphaned_pending_extractions(kind: str = "halacha") -> int: + """Re-stamp eligible 'pending' rows that fell off the queue. Returns count. + + ``requeue_stale_processing_extractions`` heals only ``status='processing'`` + orphans. A row can also be ``status='pending'`` with ``requested_at IS NULL`` + — never enqueued (bulk/migration paths, or a status set before the stamp) — + and the queue (``requested_at IS NOT NULL``) is blind to it forever, so the + backlog drains silently to nothing (#139, INV-DUR1). + + This restores the "eligible row ⇒ in the queue" invariant: it stamps + ``requested_at`` (and re-affirms 'pending') for rows that are pending, + unstamped, and EXTRACTION-eligible — using the SAME + ``EXTRACTION_ELIGIBLE_PREDICATE`` the queue reader uses (#140, G2), so + cited_only / chunkless stubs are never proactively enqueued. kind-agnostic + (metadata + halacha). Idempotent: a row already stamped is not matched, so + concurrent runs converge on the same ``now()`` stamp harmlessly. + """ + status_col = ( + "metadata_extraction_status" if kind == "metadata" + else "halacha_extraction_status" + ) + req_col = ( + "metadata_extraction_requested_at" if kind == "metadata" + else "halacha_extraction_requested_at" + ) + pool = await get_pool() + tag = await pool.execute( + f"UPDATE case_law SET {req_col} = now(), {status_col} = 'pending' " + f"WHERE {status_col} = 'pending' AND {req_col} IS NULL " + f"AND {EXTRACTION_ELIGIBLE_PREDICATE}" + ) + try: + return int(str(tag).split()[-1]) + except (ValueError, IndexError): + return 0 + + async def extraction_queue_status() -> dict: """Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45). diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 1aa8012..d153000 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -231,6 +231,17 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) - if healed: logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind) + # Re-enqueue eligible 'pending' rows that never got a queue stamp (orphaned + # from the queue — bulk/migration paths). requeue_stale only covers + # 'processing'; this covers 'pending' with requested_at IS NULL (#139). Runs + # before the list below so reclaimed rows drain in this same pass. + reconciled = await db.reconcile_orphaned_pending_extractions(kind=kind) + if reconciled: + logger.warning( + "reconciled %d orphaned 'pending' (no queue stamp) '%s' row(s)", + reconciled, kind, + ) + pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) if not pending: return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} diff --git a/mcp-server/tests/test_extraction_orphan_reconcile.py b/mcp-server/tests/test_extraction_orphan_reconcile.py new file mode 100644 index 0000000..0160ea0 --- /dev/null +++ b/mcp-server/tests/test_extraction_orphan_reconcile.py @@ -0,0 +1,85 @@ +"""Regression test for #139 — orphaned 'pending' extraction rows are reconciled. + +A row can be ``_extraction_status='pending'`` with +``_extraction_requested_at IS NULL`` — never enqueued, invisible to the +drain (which selects ``requested_at IS NOT NULL``). ``requeue_stale`` heals only +'processing'. ``reconcile_orphaned_pending_extractions`` restores the +"eligible ⇒ queued" invariant, kind-agnostic, reusing the SAME eligibility +predicate as the queue reader (#140, G2) so cited_only/chunkless stubs are never +proactively enqueued. + +Runs OFFLINE — a fake pool captures executed SQL (same style as the sibling +extraction-queue tests). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from legal_mcp.services import db + + +class _FakePool: + def __init__(self) -> None: + self.executed: list[str] = [] + + async def execute(self, sql: str, *args): # noqa: ANN002 + self.executed.append(sql) + return "UPDATE 3" + + +@pytest.fixture() +def fake_pool(monkeypatch: pytest.MonkeyPatch) -> _FakePool: + pool = _FakePool() + + async def _get_pool() -> _FakePool: + return pool + + monkeypatch.setattr(db, "get_pool", _get_pool) + return pool + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +def _norm(sql: str) -> str: + return " ".join(sql.split()) + + +@pytest.mark.parametrize("kind,status_col,req_col", [ + ("halacha", "halacha_extraction_status", "halacha_extraction_requested_at"), + ("metadata", "metadata_extraction_status", "metadata_extraction_requested_at"), +]) +def test_reconcile_targets_eligible_unstamped_pending(fake_pool, kind, status_col, req_col): + n = _run(db.reconcile_orphaned_pending_extractions(kind=kind)) + assert n == 3 + sql = _norm(fake_pool.executed[0]) + # Only pending rows with NO queue stamp... + assert f"{status_col} = 'pending'" in sql, sql + assert f"{req_col} IS NULL" in sql, sql + # ...and only EXTRACTION-eligible ones (shared #140 predicate — no parallel rule). + assert _norm(db.EXTRACTION_ELIGIBLE_PREDICATE) in sql, sql + # It stamps the queue + re-affirms pending. + assert f"{req_col} = now()" in sql, sql + + +def test_reconcile_distinct_from_requeue_stale(fake_pool): + """reconcile handles 'pending'; requeue_stale handles 'processing' — separate.""" + _run(db.reconcile_orphaned_pending_extractions(kind="halacha")) + sql = _norm(fake_pool.executed[0]) + assert "= 'processing'" not in sql, sql + + +def test_request_halacha_sets_pending_status(fake_pool): + """#139 drift fix — request_halacha_extraction writes status+stamp together.""" + _run(db.request_halacha_extraction("00000000-0000-0000-0000-000000000000")) + sql = _norm(fake_pool.executed[0]) + assert "halacha_extraction_requested_at = now()" in sql, sql + assert "halacha_extraction_status = 'pending'" in sql, sql