"""Regression test for TaskMaster #142 — leaked global advisory lock recovery. Bug (2026-06-14, CMP-174): when ``halacha_extractor`` crashed with "RuntimeError: Event loop is closed", the ``finally`` that runs ``pg_advisory_unlock`` (and ``pool.release``) never executed. The dedicated ``lock_conn`` stayed alive, idle, holding the global advisory lock, so EVERY subsequent ``extract`` returned ``status='busy'`` until a manual ``pg_terminate_backend`` (~4.5 min later) freed it. Fix: ``extract`` now (1) keeps the lock-holder's ``pg_stat_activity.state_change`` fresh via a keepalive task, and (2) on a failed ``pg_try_advisory_lock`` inspects the holder — terminating ONLY an idle backend whose ``state_change`` is older than ``_LOCK_STALE_AFTER`` (its keepalive stopped ⇒ a crash), then re-acquiring. A *live* extraction's holder is kept fresh and is never killed. Runs fully OFFLINE — a fake pool/connection captures SQL instead of hitting Postgres (same style as ``test_halacha_reextract_preserves_approved.py``). """ from __future__ import annotations import asyncio import pytest from legal_mcp.services import halacha_extractor as hx class _LockConn: """Fake asyncpg connection driving the advisory-lock SQL paths.""" def __init__(self, try_results: list[bool], holder: dict | None = None) -> None: self._try = list(try_results) # queued pg_try_advisory_lock returns self.holder = holder self.executed: list[str] = [] self.terminated: list[int] = [] self.keepalive_touches = 0 async def fetchval(self, sql: str, *args): # noqa: ANN002 s = sql.lower() if "pg_try_advisory_lock" in s: return self._try.pop(0) if "pg_advisory_unlock" in s: return True return None async def fetchrow(self, sql: str, *args): # noqa: ANN002 return self.holder async def execute(self, sql: str, *args): # noqa: ANN002 self.executed.append(sql) low = sql.lower() if "pg_terminate_backend" in low: self.terminated.append(args[0]) if low.strip() == "select 1": self.keepalive_touches += 1 return "OK" class _Pool: def __init__(self, conn: _LockConn) -> None: self.conn = conn self.acquired = 0 self.released = 0 async def acquire(self) -> _LockConn: self.acquired += 1 return self.conn async def release(self, conn: _LockConn) -> None: self.released += 1 def _run(coro): loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) finally: loop.close() def test_acquire_when_free_returns_held_conn() -> None: conn = _LockConn(try_results=[True]) pool = _Pool(conn) got = _run(hx._acquire_global_lock(pool)) assert got is conn assert pool.released == 0 # caller keeps the held connection def test_acquire_with_live_holder_returns_none() -> None: # Holder is idle but FRESH (keepalive active) → live extraction, do not kill. conn = _LockConn( try_results=[False], holder={"pid": 555, "state": "idle", "idle_seconds": 12.0}, ) pool = _Pool(conn) got = _run(hx._acquire_global_lock(pool)) assert got is None assert conn.terminated == [] # never terminate a live holder assert pool.released == 1 # connection handed back def test_acquire_does_not_kill_active_holder() -> None: # Even if stale-by-time, an 'active' backend is doing work — never killed. conn = _LockConn( try_results=[False], holder={"pid": 556, "state": "active", "idle_seconds": 9999.0}, ) pool = _Pool(conn) got = _run(hx._acquire_global_lock(pool)) assert got is None assert conn.terminated == [] def test_acquire_reclaims_leaked_orphan() -> None: # Idle holder past the stale threshold ⇒ leaked orphan: terminate + reacquire. stale = hx._LOCK_STALE_AFTER + 10 conn = _LockConn( try_results=[False, True], # fail, then succeed after terminate holder={"pid": 777, "state": "idle", "idle_seconds": float(stale)}, ) pool = _Pool(conn) got = _run(hx._acquire_global_lock(pool)) assert got is conn assert conn.terminated == [777] # the orphan was terminated assert pool.released == 0 # lock reclaimed → conn retained def test_acquire_no_holder_row_releases() -> None: # Lock released between the failed try and the holder lookup → give up cleanly. conn = _LockConn(try_results=[False], holder=None) pool = _Pool(conn) got = _run(hx._acquire_global_lock(pool)) assert got is None assert pool.released == 1 def test_keepalive_exits_on_stop_without_touching() -> None: conn = _LockConn(try_results=[]) stop = asyncio.Event() stop.set() # already signaled → must return immediately, no SELECT 1 _run(hx._lock_keepalive(conn, stop)) assert conn.keepalive_touches == 0 def test_extract_returns_busy_when_lock_unavailable(monkeypatch) -> None: async def _no_lock(_pool): return None async def _fake_pool(): return object() monkeypatch.setattr(hx, "_acquire_global_lock", _no_lock) monkeypatch.setattr(hx.db, "get_pool", _fake_pool) from uuid import uuid4 result = _run(hx.extract(uuid4())) assert result["status"] == "busy" assert result["extracted"] == 0