diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 77c75e5..aec33ef 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -6736,10 +6736,17 @@ async def request_metadata_extraction(case_law_id: UUID) -> bool: async def request_halacha_extraction(case_law_id: UUID) -> bool: """Same but for halacha extraction. See note on - :func:`request_metadata_extraction` re: opening to all source kinds.""" + :func:`request_metadata_extraction` re: opening to all source kinds. + + Sets ``halacha_extraction_status='pending'`` alongside the timestamp — + symmetric to :func:`request_metadata_extraction` — so status and queue-stamp + are written together and a re-request never leaves a stale terminal badge. + This also closes the drift window where a row could end up 'pending' with a + NULL requested_at (orphaned from the queue) (#139).""" pool = await get_pool() result = await pool.execute( - "UPDATE case_law SET halacha_extraction_requested_at = now() " + "UPDATE case_law SET halacha_extraction_requested_at = now(), " + "halacha_extraction_status = 'pending' " "WHERE id = $1", case_law_id, ) @@ -6828,6 +6835,43 @@ async def requeue_stale_processing_extractions(kind: str = "halacha") -> int: return 0 +async def reconcile_orphaned_pending_extractions(kind: str = "halacha") -> int: + """Re-stamp eligible 'pending' rows that fell off the queue. Returns count. + + ``requeue_stale_processing_extractions`` heals only ``status='processing'`` + orphans. A row can also be ``status='pending'`` with ``requested_at IS NULL`` + — never enqueued (bulk/migration paths, or a status set before the stamp) — + and the queue (``requested_at IS NOT NULL``) is blind to it forever, so the + backlog drains silently to nothing (#139, INV-DUR1). + + This restores the "eligible row ⇒ in the queue" invariant: it stamps + ``requested_at`` (and re-affirms 'pending') for rows that are pending, + unstamped, and EXTRACTION-eligible — using the SAME + ``EXTRACTION_ELIGIBLE_PREDICATE`` the queue reader uses (#140, G2), so + cited_only / chunkless stubs are never proactively enqueued. kind-agnostic + (metadata + halacha). Idempotent: a row already stamped is not matched, so + concurrent runs converge on the same ``now()`` stamp harmlessly. + """ + status_col = ( + "metadata_extraction_status" if kind == "metadata" + else "halacha_extraction_status" + ) + req_col = ( + "metadata_extraction_requested_at" if kind == "metadata" + else "halacha_extraction_requested_at" + ) + pool = await get_pool() + tag = await pool.execute( + f"UPDATE case_law SET {req_col} = now(), {status_col} = 'pending' " + f"WHERE {status_col} = 'pending' AND {req_col} IS NULL " + f"AND {EXTRACTION_ELIGIBLE_PREDICATE}" + ) + try: + return int(str(tag).split()[-1]) + except (ValueError, IndexError): + return 0 + + async def extraction_queue_status() -> dict: """Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45). diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 1aa8012..d153000 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -231,6 +231,17 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) - if healed: logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind) + # Re-enqueue eligible 'pending' rows that never got a queue stamp (orphaned + # from the queue — bulk/migration paths). requeue_stale only covers + # 'processing'; this covers 'pending' with requested_at IS NULL (#139). Runs + # before the list below so reclaimed rows drain in this same pass. + reconciled = await db.reconcile_orphaned_pending_extractions(kind=kind) + if reconciled: + logger.warning( + "reconciled %d orphaned 'pending' (no queue stamp) '%s' row(s)", + reconciled, kind, + ) + pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) if not pending: return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} diff --git a/mcp-server/tests/test_extraction_orphan_reconcile.py b/mcp-server/tests/test_extraction_orphan_reconcile.py new file mode 100644 index 0000000..0160ea0 --- /dev/null +++ b/mcp-server/tests/test_extraction_orphan_reconcile.py @@ -0,0 +1,85 @@ +"""Regression test for #139 — orphaned 'pending' extraction rows are reconciled. + +A row can be ``_extraction_status='pending'`` with +``_extraction_requested_at IS NULL`` — never enqueued, invisible to the +drain (which selects ``requested_at IS NOT NULL``). ``requeue_stale`` heals only +'processing'. ``reconcile_orphaned_pending_extractions`` restores the +"eligible ⇒ queued" invariant, kind-agnostic, reusing the SAME eligibility +predicate as the queue reader (#140, G2) so cited_only/chunkless stubs are never +proactively enqueued. + +Runs OFFLINE — a fake pool captures executed SQL (same style as the sibling +extraction-queue tests). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from legal_mcp.services import db + + +class _FakePool: + def __init__(self) -> None: + self.executed: list[str] = [] + + async def execute(self, sql: str, *args): # noqa: ANN002 + self.executed.append(sql) + return "UPDATE 3" + + +@pytest.fixture() +def fake_pool(monkeypatch: pytest.MonkeyPatch) -> _FakePool: + pool = _FakePool() + + async def _get_pool() -> _FakePool: + return pool + + monkeypatch.setattr(db, "get_pool", _get_pool) + return pool + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +def _norm(sql: str) -> str: + return " ".join(sql.split()) + + +@pytest.mark.parametrize("kind,status_col,req_col", [ + ("halacha", "halacha_extraction_status", "halacha_extraction_requested_at"), + ("metadata", "metadata_extraction_status", "metadata_extraction_requested_at"), +]) +def test_reconcile_targets_eligible_unstamped_pending(fake_pool, kind, status_col, req_col): + n = _run(db.reconcile_orphaned_pending_extractions(kind=kind)) + assert n == 3 + sql = _norm(fake_pool.executed[0]) + # Only pending rows with NO queue stamp... + assert f"{status_col} = 'pending'" in sql, sql + assert f"{req_col} IS NULL" in sql, sql + # ...and only EXTRACTION-eligible ones (shared #140 predicate — no parallel rule). + assert _norm(db.EXTRACTION_ELIGIBLE_PREDICATE) in sql, sql + # It stamps the queue + re-affirms pending. + assert f"{req_col} = now()" in sql, sql + + +def test_reconcile_distinct_from_requeue_stale(fake_pool): + """reconcile handles 'pending'; requeue_stale handles 'processing' — separate.""" + _run(db.reconcile_orphaned_pending_extractions(kind="halacha")) + sql = _norm(fake_pool.executed[0]) + assert "= 'processing'" not in sql, sql + + +def test_request_halacha_sets_pending_status(fake_pool): + """#139 drift fix — request_halacha_extraction writes status+stamp together.""" + _run(db.request_halacha_extraction("00000000-0000-0000-0000-000000000000")) + sql = _norm(fake_pool.executed[0]) + assert "halacha_extraction_requested_at = now()" in sql, sql + assert "halacha_extraction_status = 'pending'" in sql, sql