diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index fcdedb6..c83373c 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -26,6 +26,8 @@ import logging import re from uuid import UUID +import asyncpg + from legal_mcp import config from legal_mcp.config import parse_llm_json from legal_mcp.services import ( @@ -50,6 +52,34 @@ CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY # 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent # xhigh `claude -p` procs → load 69 → hard reboot. _HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA' + +# The advisory lock is a SESSION lock held on a dedicated ``lock_conn`` that +# sits idle while extraction work runs on OTHER pool connections. A hard crash +# ("RuntimeError: Event loop is closed", OOM-kill, container restart) skips the +# ``finally`` that unlocks AND skips ``pool.release`` — so the backend stays +# alive, idle, holding the lock, and EVERY future extraction gets ``busy`` +# forever (#142: a leaked lock froze all halacha extraction ~4.5 min on +# 2026-06-14 until a manual ``pg_terminate_backend``). Two mechanisms make the +# lock self-recovering: +# 1. KEEPALIVE — a background task touches ``lock_conn`` every +# ``_LOCK_KEEPALIVE_INTERVAL`` s, keeping ``pg_stat_activity.state_change`` +# fresh. A *live* extraction's lock-holder is therefore never stale; a +# crashed one's keepalive dies with the loop, so ``state_change`` freezes. +# 2. SELF-HEAL ON ACQUIRE — when ``pg_try_advisory_lock`` fails, we inspect +# the current holder; if it is idle AND its ``state_change`` is older than +# ``_LOCK_STALE_AFTER`` (≫ keepalive interval) it is a leaked orphan, so we +# ``pg_terminate_backend`` it (its session locks release on exit) and retry. +# This is why a session lock + keepalive is preferred over ``pg_advisory_xact_lock`` +# (option ג): an xact lock would force a multi-minute open transaction +# (idle-in-transaction bloat) and STILL wouldn't release a live-but-orphaned +# connection promptly. ``_LOCK_STALE_AFTER`` is large enough that an in-flight +# extraction is never mistaken for an orphan — the box-freeze the lock prevents +# must never be re-introduced by killing a live holder. +_LOCK_KEEPALIVE_INTERVAL = 30 # seconds between lock-conn keepalive touches +_LOCK_STALE_AFTER = 150 # seconds idle ⇒ leaked orphan (5× keepalive) +_LOCK_RECLAIM_RETRIES = 10 # poll attempts to re-acquire after terminate +_LOCK_RECLAIM_DELAY = 0.2 # seconds between reclaim polls + CHUNK_RETRY_ATTEMPTS = 1 # If at least this fraction of chunks crash and the precedent yields zero @@ -456,6 +486,82 @@ async def _extract_chunk( return [], False +# Reconstruct the 64-bit advisory key from pg_locks' (classid, objid) pair so +# the holder lookup is correct regardless of how Postgres splits the key. +_LOCK_HOLDER_SQL = """ +SELECT a.pid, + a.state, + EXTRACT(EPOCH FROM (now() - a.state_change)) AS idle_seconds +FROM pg_locks l +JOIN pg_stat_activity a ON a.pid = l.pid +WHERE l.locktype = 'advisory' + AND ((l.classid::bigint << 32) | (l.objid::bigint)) = $1 + AND l.objsubid = 1 + AND l.granted + AND a.pid <> pg_backend_pid() +LIMIT 1 +""" + + +async def _acquire_global_lock(pool) -> "asyncpg.Connection | None": + """Take the global advisory lock, self-healing a leaked (orphaned) holder. + + Returns a connection that HOLDS the lock, or ``None`` if a *live* extraction + legitimately holds it. On failure we look up the holder: only an **idle** + backend whose ``state_change`` is older than ``_LOCK_STALE_AFTER`` (i.e. its + keepalive stopped — a crash) is treated as a leaked orphan and terminated; + a live extraction's holder is kept fresh by its keepalive and is never + killed, so the serialization guarantee (and the box-freeze it prevents) is + preserved. + """ + conn = await pool.acquire() + try: + if await conn.fetchval("SELECT pg_try_advisory_lock($1)", + _HALACHA_EXTRACT_LOCK_KEY): + return conn + holder = await conn.fetchrow(_LOCK_HOLDER_SQL, _HALACHA_EXTRACT_LOCK_KEY) + if (holder and holder["state"] == "idle" + and (holder["idle_seconds"] or 0) >= _LOCK_STALE_AFTER): + logger.warning( + "halacha extract: reclaiming LEAKED lock — holder pid=%s idle " + "%.0fs (≥%ds, keepalive stopped → crashed). pg_terminate_backend.", + holder["pid"], holder["idle_seconds"], _LOCK_STALE_AFTER, + ) + await conn.execute("SELECT pg_terminate_backend($1)", holder["pid"]) + for _ in range(_LOCK_RECLAIM_RETRIES): + if await conn.fetchval("SELECT pg_try_advisory_lock($1)", + _HALACHA_EXTRACT_LOCK_KEY): + logger.info("halacha extract: leaked lock reclaimed.") + return conn + await asyncio.sleep(_LOCK_RECLAIM_DELAY) + await pool.release(conn) + return None + except Exception: + await pool.release(conn) + raise + + +async def _lock_keepalive(conn, stop: asyncio.Event) -> None: + """Touch ``conn`` every ``_LOCK_KEEPALIVE_INTERVAL`` s while extraction runs. + + Keeps the lock-holder's ``pg_stat_activity.state_change`` fresh so a live + extraction is never mistaken for a leaked orphan by ``_acquire_global_lock``. + Exits on ``stop`` (clean finish) or on any DB error (so the final unlock, + which reuses ``conn``, never races a keepalive query on the same connection). + """ + while not stop.is_set(): + try: + await asyncio.wait_for(stop.wait(), timeout=_LOCK_KEEPALIVE_INTERVAL) + return # stop signaled — clean exit + except asyncio.TimeoutError: + pass + try: + await conn.execute("SELECT 1") + except Exception as e: + logger.warning("halacha lock keepalive failed (stopping): %s", e) + return + + async def extract(case_law_id: UUID | str, force: bool = False, effort: str | None = None) -> dict: """Extract halachot from an uploaded precedent — globally serialized. @@ -483,28 +589,35 @@ async def extract(case_law_id: UUID | str, force: bool = False, case_law_id = UUID(case_law_id) pool = await db.get_pool() - lock_conn = await pool.acquire() - try: - got = await lock_conn.fetchval( - "SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY, + lock_conn = await _acquire_global_lock(pool) + if lock_conn is None: + logger.warning( + "halacha extract: global lock held by a live extraction — " + "skipping %s (stays pending for next drain)", case_law_id, ) - if not got: - logger.warning( - "halacha extract: global lock held by another extraction — " - "skipping %s (stays pending for next drain)", case_law_id, - ) - return { - "status": "busy", "extracted": 0, "stored": 0, - "case_law_id": str(case_law_id), - } + return { + "status": "busy", "extracted": 0, "stored": 0, + "case_law_id": str(case_law_id), + } + + stop_keepalive = asyncio.Event() + keepalive_task = asyncio.create_task(_lock_keepalive(lock_conn, stop_keepalive)) + try: + return await _extract_impl(case_law_id, force=force, effort=effort) + finally: + # Stop the keepalive and await it BEFORE reusing lock_conn for unlock — + # two coroutines must never query the same asyncpg connection at once. + stop_keepalive.set() + try: + await keepalive_task + except Exception: # pragma: no cover — keepalive swallows its own errors + logger.warning("halacha lock keepalive task ended abnormally") try: - return await _extract_impl(case_law_id, force=force, effort=effort) - finally: await lock_conn.fetchval( "SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY, ) - finally: - await pool.release(lock_conn) + finally: + await pool.release(lock_conn) async def _select_extractable_chunks( diff --git a/mcp-server/tests/test_halacha_lock_selfheal.py b/mcp-server/tests/test_halacha_lock_selfheal.py new file mode 100644 index 0000000..9b80e96 --- /dev/null +++ b/mcp-server/tests/test_halacha_lock_selfheal.py @@ -0,0 +1,159 @@ +"""Regression test for TaskMaster #142 — leaked global advisory lock recovery. + +Bug (2026-06-14, CMP-174): when ``halacha_extractor`` crashed with +"RuntimeError: Event loop is closed", the ``finally`` that runs +``pg_advisory_unlock`` (and ``pool.release``) never executed. The dedicated +``lock_conn`` stayed alive, idle, holding the global advisory lock, so EVERY +subsequent ``extract`` returned ``status='busy'`` until a manual +``pg_terminate_backend`` (~4.5 min later) freed it. + +Fix: ``extract`` now (1) keeps the lock-holder's ``pg_stat_activity.state_change`` +fresh via a keepalive task, and (2) on a failed ``pg_try_advisory_lock`` inspects +the holder — terminating ONLY an idle backend whose ``state_change`` is older +than ``_LOCK_STALE_AFTER`` (its keepalive stopped ⇒ a crash), then re-acquiring. +A *live* extraction's holder is kept fresh and is never killed. + +Runs fully OFFLINE — a fake pool/connection captures SQL instead of hitting +Postgres (same style as ``test_halacha_reextract_preserves_approved.py``). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from legal_mcp.services import halacha_extractor as hx + + +class _LockConn: + """Fake asyncpg connection driving the advisory-lock SQL paths.""" + + def __init__(self, try_results: list[bool], holder: dict | None = None) -> None: + self._try = list(try_results) # queued pg_try_advisory_lock returns + self.holder = holder + self.executed: list[str] = [] + self.terminated: list[int] = [] + self.keepalive_touches = 0 + + async def fetchval(self, sql: str, *args): # noqa: ANN002 + s = sql.lower() + if "pg_try_advisory_lock" in s: + return self._try.pop(0) + if "pg_advisory_unlock" in s: + return True + return None + + async def fetchrow(self, sql: str, *args): # noqa: ANN002 + return self.holder + + async def execute(self, sql: str, *args): # noqa: ANN002 + self.executed.append(sql) + low = sql.lower() + if "pg_terminate_backend" in low: + self.terminated.append(args[0]) + if low.strip() == "select 1": + self.keepalive_touches += 1 + return "OK" + + +class _Pool: + def __init__(self, conn: _LockConn) -> None: + self.conn = conn + self.acquired = 0 + self.released = 0 + + async def acquire(self) -> _LockConn: + self.acquired += 1 + return self.conn + + async def release(self, conn: _LockConn) -> None: + self.released += 1 + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +def test_acquire_when_free_returns_held_conn() -> None: + conn = _LockConn(try_results=[True]) + pool = _Pool(conn) + got = _run(hx._acquire_global_lock(pool)) + assert got is conn + assert pool.released == 0 # caller keeps the held connection + + +def test_acquire_with_live_holder_returns_none() -> None: + # Holder is idle but FRESH (keepalive active) → live extraction, do not kill. + conn = _LockConn( + try_results=[False], + holder={"pid": 555, "state": "idle", "idle_seconds": 12.0}, + ) + pool = _Pool(conn) + got = _run(hx._acquire_global_lock(pool)) + assert got is None + assert conn.terminated == [] # never terminate a live holder + assert pool.released == 1 # connection handed back + + +def test_acquire_does_not_kill_active_holder() -> None: + # Even if stale-by-time, an 'active' backend is doing work — never killed. + conn = _LockConn( + try_results=[False], + holder={"pid": 556, "state": "active", "idle_seconds": 9999.0}, + ) + pool = _Pool(conn) + got = _run(hx._acquire_global_lock(pool)) + assert got is None + assert conn.terminated == [] + + +def test_acquire_reclaims_leaked_orphan() -> None: + # Idle holder past the stale threshold ⇒ leaked orphan: terminate + reacquire. + stale = hx._LOCK_STALE_AFTER + 10 + conn = _LockConn( + try_results=[False, True], # fail, then succeed after terminate + holder={"pid": 777, "state": "idle", "idle_seconds": float(stale)}, + ) + pool = _Pool(conn) + got = _run(hx._acquire_global_lock(pool)) + assert got is conn + assert conn.terminated == [777] # the orphan was terminated + assert pool.released == 0 # lock reclaimed → conn retained + + +def test_acquire_no_holder_row_releases() -> None: + # Lock released between the failed try and the holder lookup → give up cleanly. + conn = _LockConn(try_results=[False], holder=None) + pool = _Pool(conn) + got = _run(hx._acquire_global_lock(pool)) + assert got is None + assert pool.released == 1 + + +def test_keepalive_exits_on_stop_without_touching() -> None: + conn = _LockConn(try_results=[]) + stop = asyncio.Event() + stop.set() # already signaled → must return immediately, no SELECT 1 + _run(hx._lock_keepalive(conn, stop)) + assert conn.keepalive_touches == 0 + + +def test_extract_returns_busy_when_lock_unavailable(monkeypatch) -> None: + async def _no_lock(_pool): + return None + + async def _fake_pool(): + return object() + + monkeypatch.setattr(hx, "_acquire_global_lock", _no_lock) + monkeypatch.setattr(hx.db, "get_pool", _fake_pool) + + from uuid import uuid4 + result = _run(hx.extract(uuid4())) + assert result["status"] == "busy" + assert result["extracted"] == 0