fix(extraction): reconcile לתיקים-יתומים בתור — pending+requested_at=NULL (#139) #264

Merged
chaim merged 1 commits from worktree-extraction-orphan-reconcile into main 2026-06-15 04:11:19 +00:00
3 changed files with 142 additions and 2 deletions
Showing only changes of commit 2343892220 - Show all commits

View File

@@ -6736,10 +6736,17 @@ async def request_metadata_extraction(case_law_id: UUID) -> bool:
async def request_halacha_extraction(case_law_id: UUID) -> bool: async def request_halacha_extraction(case_law_id: UUID) -> bool:
"""Same but for halacha extraction. See note on """Same but for halacha extraction. See note on
:func:`request_metadata_extraction` re: opening to all source kinds.""" :func:`request_metadata_extraction` re: opening to all source kinds.
Sets ``halacha_extraction_status='pending'`` alongside the timestamp —
symmetric to :func:`request_metadata_extraction` — so status and queue-stamp
are written together and a re-request never leaves a stale terminal badge.
This also closes the drift window where a row could end up 'pending' with a
NULL requested_at (orphaned from the queue) (#139)."""
pool = await get_pool() pool = await get_pool()
result = await pool.execute( result = await pool.execute(
"UPDATE case_law SET halacha_extraction_requested_at = now() " "UPDATE case_law SET halacha_extraction_requested_at = now(), "
"halacha_extraction_status = 'pending' "
"WHERE id = $1", "WHERE id = $1",
case_law_id, case_law_id,
) )
@@ -6828,6 +6835,43 @@ async def requeue_stale_processing_extractions(kind: str = "halacha") -> int:
return 0 return 0
async def reconcile_orphaned_pending_extractions(kind: str = "halacha") -> int:
"""Re-stamp eligible 'pending' rows that fell off the queue. Returns count.
``requeue_stale_processing_extractions`` heals only ``status='processing'``
orphans. A row can also be ``status='pending'`` with ``requested_at IS NULL``
— never enqueued (bulk/migration paths, or a status set before the stamp) —
and the queue (``requested_at IS NOT NULL``) is blind to it forever, so the
backlog drains silently to nothing (#139, INV-DUR1).
This restores the "eligible row ⇒ in the queue" invariant: it stamps
``requested_at`` (and re-affirms 'pending') for rows that are pending,
unstamped, and EXTRACTION-eligible — using the SAME
``EXTRACTION_ELIGIBLE_PREDICATE`` the queue reader uses (#140, G2), so
cited_only / chunkless stubs are never proactively enqueued. kind-agnostic
(metadata + halacha). Idempotent: a row already stamped is not matched, so
concurrent runs converge on the same ``now()`` stamp harmlessly.
"""
status_col = (
"metadata_extraction_status" if kind == "metadata"
else "halacha_extraction_status"
)
req_col = (
"metadata_extraction_requested_at" if kind == "metadata"
else "halacha_extraction_requested_at"
)
pool = await get_pool()
tag = await pool.execute(
f"UPDATE case_law SET {req_col} = now(), {status_col} = 'pending' "
f"WHERE {status_col} = 'pending' AND {req_col} IS NULL "
f"AND {EXTRACTION_ELIGIBLE_PREDICATE}"
)
try:
return int(str(tag).split()[-1])
except (ValueError, IndexError):
return 0
async def extraction_queue_status() -> dict: async def extraction_queue_status() -> dict:
"""Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45). """Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45).

View File

@@ -231,6 +231,17 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -
if healed: if healed:
logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind) logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind)
# Re-enqueue eligible 'pending' rows that never got a queue stamp (orphaned
# from the queue — bulk/migration paths). requeue_stale only covers
# 'processing'; this covers 'pending' with requested_at IS NULL (#139). Runs
# before the list below so reclaimed rows drain in this same pass.
reconciled = await db.reconcile_orphaned_pending_extractions(kind=kind)
if reconciled:
logger.warning(
"reconciled %d orphaned 'pending' (no queue stamp) '%s' row(s)",
reconciled, kind,
)
pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) pending = await db.list_pending_extraction_requests(kind=kind, limit=limit)
if not pending: if not pending:
return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} return {"status": "no_pending", "kind": kind, "processed": 0, "results": []}

View File

@@ -0,0 +1,85 @@
"""Regression test for #139 — orphaned 'pending' extraction rows are reconciled.
A row can be ``<kind>_extraction_status='pending'`` with
``<kind>_extraction_requested_at IS NULL`` — never enqueued, invisible to the
drain (which selects ``requested_at IS NOT NULL``). ``requeue_stale`` heals only
'processing'. ``reconcile_orphaned_pending_extractions`` restores the
"eligible ⇒ queued" invariant, kind-agnostic, reusing the SAME eligibility
predicate as the queue reader (#140, G2) so cited_only/chunkless stubs are never
proactively enqueued.
Runs OFFLINE — a fake pool captures executed SQL (same style as the sibling
extraction-queue tests).
"""
from __future__ import annotations
import asyncio
import pytest
from legal_mcp.services import db
class _FakePool:
def __init__(self) -> None:
self.executed: list[str] = []
async def execute(self, sql: str, *args): # noqa: ANN002
self.executed.append(sql)
return "UPDATE 3"
@pytest.fixture()
def fake_pool(monkeypatch: pytest.MonkeyPatch) -> _FakePool:
pool = _FakePool()
async def _get_pool() -> _FakePool:
return pool
monkeypatch.setattr(db, "get_pool", _get_pool)
return pool
def _run(coro):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _norm(sql: str) -> str:
return " ".join(sql.split())
@pytest.mark.parametrize("kind,status_col,req_col", [
("halacha", "halacha_extraction_status", "halacha_extraction_requested_at"),
("metadata", "metadata_extraction_status", "metadata_extraction_requested_at"),
])
def test_reconcile_targets_eligible_unstamped_pending(fake_pool, kind, status_col, req_col):
n = _run(db.reconcile_orphaned_pending_extractions(kind=kind))
assert n == 3
sql = _norm(fake_pool.executed[0])
# Only pending rows with NO queue stamp...
assert f"{status_col} = 'pending'" in sql, sql
assert f"{req_col} IS NULL" in sql, sql
# ...and only EXTRACTION-eligible ones (shared #140 predicate — no parallel rule).
assert _norm(db.EXTRACTION_ELIGIBLE_PREDICATE) in sql, sql
# It stamps the queue + re-affirms pending.
assert f"{req_col} = now()" in sql, sql
def test_reconcile_distinct_from_requeue_stale(fake_pool):
"""reconcile handles 'pending'; requeue_stale handles 'processing' — separate."""
_run(db.reconcile_orphaned_pending_extractions(kind="halacha"))
sql = _norm(fake_pool.executed[0])
assert "= 'processing'" not in sql, sql
def test_request_halacha_sets_pending_status(fake_pool):
"""#139 drift fix — request_halacha_extraction writes status+stamp together."""
_run(db.request_halacha_extraction("00000000-0000-0000-0000-000000000000"))
sql = _norm(fake_pool.executed[0])
assert "halacha_extraction_requested_at = now()" in sql, sql
assert "halacha_extraction_status = 'pending'" in sql, sql