fix(halacha): שחזור-עצמי לנעילת-advisory דלופה — לא לחסום חילוץ-הלכות (#142) #258
@@ -26,6 +26,8 @@ import logging
|
|||||||
import re
|
import re
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
from legal_mcp import config
|
from legal_mcp import config
|
||||||
from legal_mcp.config import parse_llm_json
|
from legal_mcp.config import parse_llm_json
|
||||||
from legal_mcp.services import (
|
from legal_mcp.services import (
|
||||||
@@ -50,6 +52,34 @@ CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY
|
|||||||
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
|
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
|
||||||
# xhigh `claude -p` procs → load 69 → hard reboot.
|
# xhigh `claude -p` procs → load 69 → hard reboot.
|
||||||
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
|
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
|
||||||
|
|
||||||
|
# The advisory lock is a SESSION lock held on a dedicated ``lock_conn`` that
|
||||||
|
# sits idle while extraction work runs on OTHER pool connections. A hard crash
|
||||||
|
# ("RuntimeError: Event loop is closed", OOM-kill, container restart) skips the
|
||||||
|
# ``finally`` that unlocks AND skips ``pool.release`` — so the backend stays
|
||||||
|
# alive, idle, holding the lock, and EVERY future extraction gets ``busy``
|
||||||
|
# forever (#142: a leaked lock froze all halacha extraction ~4.5 min on
|
||||||
|
# 2026-06-14 until a manual ``pg_terminate_backend``). Two mechanisms make the
|
||||||
|
# lock self-recovering:
|
||||||
|
# 1. KEEPALIVE — a background task touches ``lock_conn`` every
|
||||||
|
# ``_LOCK_KEEPALIVE_INTERVAL`` s, keeping ``pg_stat_activity.state_change``
|
||||||
|
# fresh. A *live* extraction's lock-holder is therefore never stale; a
|
||||||
|
# crashed one's keepalive dies with the loop, so ``state_change`` freezes.
|
||||||
|
# 2. SELF-HEAL ON ACQUIRE — when ``pg_try_advisory_lock`` fails, we inspect
|
||||||
|
# the current holder; if it is idle AND its ``state_change`` is older than
|
||||||
|
# ``_LOCK_STALE_AFTER`` (≫ keepalive interval) it is a leaked orphan, so we
|
||||||
|
# ``pg_terminate_backend`` it (its session locks release on exit) and retry.
|
||||||
|
# This is why a session lock + keepalive is preferred over ``pg_advisory_xact_lock``
|
||||||
|
# (option ג): an xact lock would force a multi-minute open transaction
|
||||||
|
# (idle-in-transaction bloat) and STILL wouldn't release a live-but-orphaned
|
||||||
|
# connection promptly. ``_LOCK_STALE_AFTER`` is large enough that an in-flight
|
||||||
|
# extraction is never mistaken for an orphan — the box-freeze the lock prevents
|
||||||
|
# must never be re-introduced by killing a live holder.
|
||||||
|
_LOCK_KEEPALIVE_INTERVAL = 30 # seconds between lock-conn keepalive touches
|
||||||
|
_LOCK_STALE_AFTER = 150 # seconds idle ⇒ leaked orphan (5× keepalive)
|
||||||
|
_LOCK_RECLAIM_RETRIES = 10 # poll attempts to re-acquire after terminate
|
||||||
|
_LOCK_RECLAIM_DELAY = 0.2 # seconds between reclaim polls
|
||||||
|
|
||||||
CHUNK_RETRY_ATTEMPTS = 1
|
CHUNK_RETRY_ATTEMPTS = 1
|
||||||
|
|
||||||
# If at least this fraction of chunks crash and the precedent yields zero
|
# If at least this fraction of chunks crash and the precedent yields zero
|
||||||
@@ -456,6 +486,82 @@ async def _extract_chunk(
|
|||||||
return [], False
|
return [], False
|
||||||
|
|
||||||
|
|
||||||
|
# Reconstruct the 64-bit advisory key from pg_locks' (classid, objid) pair so
|
||||||
|
# the holder lookup is correct regardless of how Postgres splits the key.
|
||||||
|
_LOCK_HOLDER_SQL = """
|
||||||
|
SELECT a.pid,
|
||||||
|
a.state,
|
||||||
|
EXTRACT(EPOCH FROM (now() - a.state_change)) AS idle_seconds
|
||||||
|
FROM pg_locks l
|
||||||
|
JOIN pg_stat_activity a ON a.pid = l.pid
|
||||||
|
WHERE l.locktype = 'advisory'
|
||||||
|
AND ((l.classid::bigint << 32) | (l.objid::bigint)) = $1
|
||||||
|
AND l.objsubid = 1
|
||||||
|
AND l.granted
|
||||||
|
AND a.pid <> pg_backend_pid()
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
async def _acquire_global_lock(pool) -> "asyncpg.Connection | None":
|
||||||
|
"""Take the global advisory lock, self-healing a leaked (orphaned) holder.
|
||||||
|
|
||||||
|
Returns a connection that HOLDS the lock, or ``None`` if a *live* extraction
|
||||||
|
legitimately holds it. On failure we look up the holder: only an **idle**
|
||||||
|
backend whose ``state_change`` is older than ``_LOCK_STALE_AFTER`` (i.e. its
|
||||||
|
keepalive stopped — a crash) is treated as a leaked orphan and terminated;
|
||||||
|
a live extraction's holder is kept fresh by its keepalive and is never
|
||||||
|
killed, so the serialization guarantee (and the box-freeze it prevents) is
|
||||||
|
preserved.
|
||||||
|
"""
|
||||||
|
conn = await pool.acquire()
|
||||||
|
try:
|
||||||
|
if await conn.fetchval("SELECT pg_try_advisory_lock($1)",
|
||||||
|
_HALACHA_EXTRACT_LOCK_KEY):
|
||||||
|
return conn
|
||||||
|
holder = await conn.fetchrow(_LOCK_HOLDER_SQL, _HALACHA_EXTRACT_LOCK_KEY)
|
||||||
|
if (holder and holder["state"] == "idle"
|
||||||
|
and (holder["idle_seconds"] or 0) >= _LOCK_STALE_AFTER):
|
||||||
|
logger.warning(
|
||||||
|
"halacha extract: reclaiming LEAKED lock — holder pid=%s idle "
|
||||||
|
"%.0fs (≥%ds, keepalive stopped → crashed). pg_terminate_backend.",
|
||||||
|
holder["pid"], holder["idle_seconds"], _LOCK_STALE_AFTER,
|
||||||
|
)
|
||||||
|
await conn.execute("SELECT pg_terminate_backend($1)", holder["pid"])
|
||||||
|
for _ in range(_LOCK_RECLAIM_RETRIES):
|
||||||
|
if await conn.fetchval("SELECT pg_try_advisory_lock($1)",
|
||||||
|
_HALACHA_EXTRACT_LOCK_KEY):
|
||||||
|
logger.info("halacha extract: leaked lock reclaimed.")
|
||||||
|
return conn
|
||||||
|
await asyncio.sleep(_LOCK_RECLAIM_DELAY)
|
||||||
|
await pool.release(conn)
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
await pool.release(conn)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def _lock_keepalive(conn, stop: asyncio.Event) -> None:
|
||||||
|
"""Touch ``conn`` every ``_LOCK_KEEPALIVE_INTERVAL`` s while extraction runs.
|
||||||
|
|
||||||
|
Keeps the lock-holder's ``pg_stat_activity.state_change`` fresh so a live
|
||||||
|
extraction is never mistaken for a leaked orphan by ``_acquire_global_lock``.
|
||||||
|
Exits on ``stop`` (clean finish) or on any DB error (so the final unlock,
|
||||||
|
which reuses ``conn``, never races a keepalive query on the same connection).
|
||||||
|
"""
|
||||||
|
while not stop.is_set():
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(stop.wait(), timeout=_LOCK_KEEPALIVE_INTERVAL)
|
||||||
|
return # stop signaled — clean exit
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await conn.execute("SELECT 1")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("halacha lock keepalive failed (stopping): %s", e)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
async def extract(case_law_id: UUID | str, force: bool = False,
|
async def extract(case_law_id: UUID | str, force: bool = False,
|
||||||
effort: str | None = None) -> dict:
|
effort: str | None = None) -> dict:
|
||||||
"""Extract halachot from an uploaded precedent — globally serialized.
|
"""Extract halachot from an uploaded precedent — globally serialized.
|
||||||
@@ -483,23 +589,30 @@ async def extract(case_law_id: UUID | str, force: bool = False,
|
|||||||
case_law_id = UUID(case_law_id)
|
case_law_id = UUID(case_law_id)
|
||||||
|
|
||||||
pool = await db.get_pool()
|
pool = await db.get_pool()
|
||||||
lock_conn = await pool.acquire()
|
lock_conn = await _acquire_global_lock(pool)
|
||||||
try:
|
if lock_conn is None:
|
||||||
got = await lock_conn.fetchval(
|
|
||||||
"SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
|
||||||
)
|
|
||||||
if not got:
|
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"halacha extract: global lock held by another extraction — "
|
"halacha extract: global lock held by a live extraction — "
|
||||||
"skipping %s (stays pending for next drain)", case_law_id,
|
"skipping %s (stays pending for next drain)", case_law_id,
|
||||||
)
|
)
|
||||||
return {
|
return {
|
||||||
"status": "busy", "extracted": 0, "stored": 0,
|
"status": "busy", "extracted": 0, "stored": 0,
|
||||||
"case_law_id": str(case_law_id),
|
"case_law_id": str(case_law_id),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stop_keepalive = asyncio.Event()
|
||||||
|
keepalive_task = asyncio.create_task(_lock_keepalive(lock_conn, stop_keepalive))
|
||||||
try:
|
try:
|
||||||
return await _extract_impl(case_law_id, force=force, effort=effort)
|
return await _extract_impl(case_law_id, force=force, effort=effort)
|
||||||
finally:
|
finally:
|
||||||
|
# Stop the keepalive and await it BEFORE reusing lock_conn for unlock —
|
||||||
|
# two coroutines must never query the same asyncpg connection at once.
|
||||||
|
stop_keepalive.set()
|
||||||
|
try:
|
||||||
|
await keepalive_task
|
||||||
|
except Exception: # pragma: no cover — keepalive swallows its own errors
|
||||||
|
logger.warning("halacha lock keepalive task ended abnormally")
|
||||||
|
try:
|
||||||
await lock_conn.fetchval(
|
await lock_conn.fetchval(
|
||||||
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||||
)
|
)
|
||||||
|
|||||||
159
mcp-server/tests/test_halacha_lock_selfheal.py
Normal file
159
mcp-server/tests/test_halacha_lock_selfheal.py
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
"""Regression test for TaskMaster #142 — leaked global advisory lock recovery.
|
||||||
|
|
||||||
|
Bug (2026-06-14, CMP-174): when ``halacha_extractor`` crashed with
|
||||||
|
"RuntimeError: Event loop is closed", the ``finally`` that runs
|
||||||
|
``pg_advisory_unlock`` (and ``pool.release``) never executed. The dedicated
|
||||||
|
``lock_conn`` stayed alive, idle, holding the global advisory lock, so EVERY
|
||||||
|
subsequent ``extract`` returned ``status='busy'`` until a manual
|
||||||
|
``pg_terminate_backend`` (~4.5 min later) freed it.
|
||||||
|
|
||||||
|
Fix: ``extract`` now (1) keeps the lock-holder's ``pg_stat_activity.state_change``
|
||||||
|
fresh via a keepalive task, and (2) on a failed ``pg_try_advisory_lock`` inspects
|
||||||
|
the holder — terminating ONLY an idle backend whose ``state_change`` is older
|
||||||
|
than ``_LOCK_STALE_AFTER`` (its keepalive stopped ⇒ a crash), then re-acquiring.
|
||||||
|
A *live* extraction's holder is kept fresh and is never killed.
|
||||||
|
|
||||||
|
Runs fully OFFLINE — a fake pool/connection captures SQL instead of hitting
|
||||||
|
Postgres (same style as ``test_halacha_reextract_preserves_approved.py``).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from legal_mcp.services import halacha_extractor as hx
|
||||||
|
|
||||||
|
|
||||||
|
class _LockConn:
|
||||||
|
"""Fake asyncpg connection driving the advisory-lock SQL paths."""
|
||||||
|
|
||||||
|
def __init__(self, try_results: list[bool], holder: dict | None = None) -> None:
|
||||||
|
self._try = list(try_results) # queued pg_try_advisory_lock returns
|
||||||
|
self.holder = holder
|
||||||
|
self.executed: list[str] = []
|
||||||
|
self.terminated: list[int] = []
|
||||||
|
self.keepalive_touches = 0
|
||||||
|
|
||||||
|
async def fetchval(self, sql: str, *args): # noqa: ANN002
|
||||||
|
s = sql.lower()
|
||||||
|
if "pg_try_advisory_lock" in s:
|
||||||
|
return self._try.pop(0)
|
||||||
|
if "pg_advisory_unlock" in s:
|
||||||
|
return True
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def fetchrow(self, sql: str, *args): # noqa: ANN002
|
||||||
|
return self.holder
|
||||||
|
|
||||||
|
async def execute(self, sql: str, *args): # noqa: ANN002
|
||||||
|
self.executed.append(sql)
|
||||||
|
low = sql.lower()
|
||||||
|
if "pg_terminate_backend" in low:
|
||||||
|
self.terminated.append(args[0])
|
||||||
|
if low.strip() == "select 1":
|
||||||
|
self.keepalive_touches += 1
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
|
||||||
|
class _Pool:
|
||||||
|
def __init__(self, conn: _LockConn) -> None:
|
||||||
|
self.conn = conn
|
||||||
|
self.acquired = 0
|
||||||
|
self.released = 0
|
||||||
|
|
||||||
|
async def acquire(self) -> _LockConn:
|
||||||
|
self.acquired += 1
|
||||||
|
return self.conn
|
||||||
|
|
||||||
|
async def release(self, conn: _LockConn) -> None:
|
||||||
|
self.released += 1
|
||||||
|
|
||||||
|
|
||||||
|
def _run(coro):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
try:
|
||||||
|
return loop.run_until_complete(coro)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_acquire_when_free_returns_held_conn() -> None:
|
||||||
|
conn = _LockConn(try_results=[True])
|
||||||
|
pool = _Pool(conn)
|
||||||
|
got = _run(hx._acquire_global_lock(pool))
|
||||||
|
assert got is conn
|
||||||
|
assert pool.released == 0 # caller keeps the held connection
|
||||||
|
|
||||||
|
|
||||||
|
def test_acquire_with_live_holder_returns_none() -> None:
|
||||||
|
# Holder is idle but FRESH (keepalive active) → live extraction, do not kill.
|
||||||
|
conn = _LockConn(
|
||||||
|
try_results=[False],
|
||||||
|
holder={"pid": 555, "state": "idle", "idle_seconds": 12.0},
|
||||||
|
)
|
||||||
|
pool = _Pool(conn)
|
||||||
|
got = _run(hx._acquire_global_lock(pool))
|
||||||
|
assert got is None
|
||||||
|
assert conn.terminated == [] # never terminate a live holder
|
||||||
|
assert pool.released == 1 # connection handed back
|
||||||
|
|
||||||
|
|
||||||
|
def test_acquire_does_not_kill_active_holder() -> None:
|
||||||
|
# Even if stale-by-time, an 'active' backend is doing work — never killed.
|
||||||
|
conn = _LockConn(
|
||||||
|
try_results=[False],
|
||||||
|
holder={"pid": 556, "state": "active", "idle_seconds": 9999.0},
|
||||||
|
)
|
||||||
|
pool = _Pool(conn)
|
||||||
|
got = _run(hx._acquire_global_lock(pool))
|
||||||
|
assert got is None
|
||||||
|
assert conn.terminated == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_acquire_reclaims_leaked_orphan() -> None:
|
||||||
|
# Idle holder past the stale threshold ⇒ leaked orphan: terminate + reacquire.
|
||||||
|
stale = hx._LOCK_STALE_AFTER + 10
|
||||||
|
conn = _LockConn(
|
||||||
|
try_results=[False, True], # fail, then succeed after terminate
|
||||||
|
holder={"pid": 777, "state": "idle", "idle_seconds": float(stale)},
|
||||||
|
)
|
||||||
|
pool = _Pool(conn)
|
||||||
|
got = _run(hx._acquire_global_lock(pool))
|
||||||
|
assert got is conn
|
||||||
|
assert conn.terminated == [777] # the orphan was terminated
|
||||||
|
assert pool.released == 0 # lock reclaimed → conn retained
|
||||||
|
|
||||||
|
|
||||||
|
def test_acquire_no_holder_row_releases() -> None:
|
||||||
|
# Lock released between the failed try and the holder lookup → give up cleanly.
|
||||||
|
conn = _LockConn(try_results=[False], holder=None)
|
||||||
|
pool = _Pool(conn)
|
||||||
|
got = _run(hx._acquire_global_lock(pool))
|
||||||
|
assert got is None
|
||||||
|
assert pool.released == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_keepalive_exits_on_stop_without_touching() -> None:
|
||||||
|
conn = _LockConn(try_results=[])
|
||||||
|
stop = asyncio.Event()
|
||||||
|
stop.set() # already signaled → must return immediately, no SELECT 1
|
||||||
|
_run(hx._lock_keepalive(conn, stop))
|
||||||
|
assert conn.keepalive_touches == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_returns_busy_when_lock_unavailable(monkeypatch) -> None:
|
||||||
|
async def _no_lock(_pool):
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _fake_pool():
|
||||||
|
return object()
|
||||||
|
|
||||||
|
monkeypatch.setattr(hx, "_acquire_global_lock", _no_lock)
|
||||||
|
monkeypatch.setattr(hx.db, "get_pool", _fake_pool)
|
||||||
|
|
||||||
|
from uuid import uuid4
|
||||||
|
result = _run(hx.extract(uuid4()))
|
||||||
|
assert result["status"] == "busy"
|
||||||
|
assert result["extracted"] == 0
|
||||||
Reference in New Issue
Block a user