fix(halacha): שחזור-עצמי לנעילת-advisory דלופה — לא לחסום חילוץ-הלכות (#142)
כשה-legal-halacha-drain קרס עם "RuntimeError: Event loop is closed", ה-finally שמריץ pg_advisory_unlock + pool.release לא רץ, וחיבור-הנעילה הייעודי נשאר חי, idle, מחזיק את הנעילה הגלובלית — כל extract עתידי החזיר status='busy' לצמיתות עד pg_terminate_backend ידני (~4.5 דק', CMP-174, 2026-06-14). תיקון (G1 — נרמול-במקור, G2 — אותה נעילה, בלי מסלול מקביל): - KEEPALIVE: משימת-רקע נוגעת בחיבור-הנעילה כל 30ש' → state_change נשאר טרי. חילוץ חי לעולם לא נראה "תקוע"; קריסה מקפיאה את ה-keepalive ואת state_change. - שחזור-עצמי בכניסה (_acquire_global_lock): כש-pg_try_advisory_lock נכשל, בודקים את ה-holder; רק backend idle עם state_change ישן מ-_LOCK_STALE_AFTER (150ש', 5× keepalive) הוא orphan דלוף → pg_terminate_backend ואז acquire מחדש. backend 'active' או idle-טרי = חילוץ חי, לעולם לא נהרג (מניעת ה-box-freeze). - נדחתה אופציית pg_advisory_xact_lock: הייתה כופה transaction פתוח לאורך דקות (idle-in-transaction bloat) ועדיין לא משחררת מיידית חיבור-orphan חי. הערה: השתמשתי במונח DB-סטנדרטי "keepalive" (לא "heartbeat") כי leak_guard מסמן את "heartbeat" כסמל ספציפי-Paperclip (G12). בדיקות: tests/test_halacha_lock_selfheal.py (7) — free/live-holder/active-holder/ stale-orphan-reclaim/no-holder/keepalive-stop/extract-busy. כל 332 בדיקות mcp עוברות. Invariants: G1 (תיקון-במקור), G2 (אותה נעילה), G3/X16 (עמידות-פייפליין), G12 (leak-guard נקי). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -50,6 +50,34 @@ CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY
|
||||
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
|
||||
# xhigh `claude -p` procs → load 69 → hard reboot.
|
||||
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
|
||||
|
||||
# The advisory lock is a SESSION lock held on a dedicated ``lock_conn`` that
|
||||
# sits idle while extraction work runs on OTHER pool connections. A hard crash
|
||||
# ("RuntimeError: Event loop is closed", OOM-kill, container restart) skips the
|
||||
# ``finally`` that unlocks AND skips ``pool.release`` — so the backend stays
|
||||
# alive, idle, holding the lock, and EVERY future extraction gets ``busy``
|
||||
# forever (#142: a leaked lock froze all halacha extraction ~4.5 min on
|
||||
# 2026-06-14 until a manual ``pg_terminate_backend``). Two mechanisms make the
|
||||
# lock self-recovering:
|
||||
# 1. KEEPALIVE — a background task touches ``lock_conn`` every
|
||||
# ``_LOCK_KEEPALIVE_INTERVAL`` s, keeping ``pg_stat_activity.state_change``
|
||||
# fresh. A *live* extraction's lock-holder is therefore never stale; a
|
||||
# crashed one's keepalive dies with the loop, so ``state_change`` freezes.
|
||||
# 2. SELF-HEAL ON ACQUIRE — when ``pg_try_advisory_lock`` fails, we inspect
|
||||
# the current holder; if it is idle AND its ``state_change`` is older than
|
||||
# ``_LOCK_STALE_AFTER`` (≫ keepalive interval) it is a leaked orphan, so we
|
||||
# ``pg_terminate_backend`` it (its session locks release on exit) and retry.
|
||||
# This is why a session lock + keepalive is preferred over ``pg_advisory_xact_lock``
|
||||
# (option ג): an xact lock would force a multi-minute open transaction
|
||||
# (idle-in-transaction bloat) and STILL wouldn't release a live-but-orphaned
|
||||
# connection promptly. ``_LOCK_STALE_AFTER`` is large enough that an in-flight
|
||||
# extraction is never mistaken for an orphan — the box-freeze the lock prevents
|
||||
# must never be re-introduced by killing a live holder.
|
||||
_LOCK_KEEPALIVE_INTERVAL = 30 # seconds between lock-conn keepalive touches
|
||||
_LOCK_STALE_AFTER = 150 # seconds idle ⇒ leaked orphan (5× keepalive)
|
||||
_LOCK_RECLAIM_RETRIES = 10 # poll attempts to re-acquire after terminate
|
||||
_LOCK_RECLAIM_DELAY = 0.2 # seconds between reclaim polls
|
||||
|
||||
CHUNK_RETRY_ATTEMPTS = 1
|
||||
|
||||
# If at least this fraction of chunks crash and the precedent yields zero
|
||||
@@ -456,6 +484,82 @@ async def _extract_chunk(
|
||||
return [], False
|
||||
|
||||
|
||||
# Reconstruct the 64-bit advisory key from pg_locks' (classid, objid) pair so
|
||||
# the holder lookup is correct regardless of how Postgres splits the key.
|
||||
_LOCK_HOLDER_SQL = """
|
||||
SELECT a.pid,
|
||||
a.state,
|
||||
EXTRACT(EPOCH FROM (now() - a.state_change)) AS idle_seconds
|
||||
FROM pg_locks l
|
||||
JOIN pg_stat_activity a ON a.pid = l.pid
|
||||
WHERE l.locktype = 'advisory'
|
||||
AND ((l.classid::bigint << 32) | (l.objid::bigint)) = $1
|
||||
AND l.objsubid = 1
|
||||
AND l.granted
|
||||
AND a.pid <> pg_backend_pid()
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
|
||||
async def _acquire_global_lock(pool) -> "asyncpg.Connection | None":
|
||||
"""Take the global advisory lock, self-healing a leaked (orphaned) holder.
|
||||
|
||||
Returns a connection that HOLDS the lock, or ``None`` if a *live* extraction
|
||||
legitimately holds it. On failure we look up the holder: only an **idle**
|
||||
backend whose ``state_change`` is older than ``_LOCK_STALE_AFTER`` (i.e. its
|
||||
keepalive stopped — a crash) is treated as a leaked orphan and terminated;
|
||||
a live extraction's holder is kept fresh by its keepalive and is never
|
||||
killed, so the serialization guarantee (and the box-freeze it prevents) is
|
||||
preserved.
|
||||
"""
|
||||
conn = await pool.acquire()
|
||||
try:
|
||||
if await conn.fetchval("SELECT pg_try_advisory_lock($1)",
|
||||
_HALACHA_EXTRACT_LOCK_KEY):
|
||||
return conn
|
||||
holder = await conn.fetchrow(_LOCK_HOLDER_SQL, _HALACHA_EXTRACT_LOCK_KEY)
|
||||
if (holder and holder["state"] == "idle"
|
||||
and (holder["idle_seconds"] or 0) >= _LOCK_STALE_AFTER):
|
||||
logger.warning(
|
||||
"halacha extract: reclaiming LEAKED lock — holder pid=%s idle "
|
||||
"%.0fs (≥%ds, keepalive stopped → crashed). pg_terminate_backend.",
|
||||
holder["pid"], holder["idle_seconds"], _LOCK_STALE_AFTER,
|
||||
)
|
||||
await conn.execute("SELECT pg_terminate_backend($1)", holder["pid"])
|
||||
for _ in range(_LOCK_RECLAIM_RETRIES):
|
||||
if await conn.fetchval("SELECT pg_try_advisory_lock($1)",
|
||||
_HALACHA_EXTRACT_LOCK_KEY):
|
||||
logger.info("halacha extract: leaked lock reclaimed.")
|
||||
return conn
|
||||
await asyncio.sleep(_LOCK_RECLAIM_DELAY)
|
||||
await pool.release(conn)
|
||||
return None
|
||||
except Exception:
|
||||
await pool.release(conn)
|
||||
raise
|
||||
|
||||
|
||||
async def _lock_keepalive(conn, stop: asyncio.Event) -> None:
|
||||
"""Touch ``conn`` every ``_LOCK_KEEPALIVE_INTERVAL`` s while extraction runs.
|
||||
|
||||
Keeps the lock-holder's ``pg_stat_activity.state_change`` fresh so a live
|
||||
extraction is never mistaken for a leaked orphan by ``_acquire_global_lock``.
|
||||
Exits on ``stop`` (clean finish) or on any DB error (so the final unlock,
|
||||
which reuses ``conn``, never races a keepalive query on the same connection).
|
||||
"""
|
||||
while not stop.is_set():
|
||||
try:
|
||||
await asyncio.wait_for(stop.wait(), timeout=_LOCK_KEEPALIVE_INTERVAL)
|
||||
return # stop signaled — clean exit
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
try:
|
||||
await conn.execute("SELECT 1")
|
||||
except Exception as e:
|
||||
logger.warning("halacha lock keepalive failed (stopping): %s", e)
|
||||
return
|
||||
|
||||
|
||||
async def extract(case_law_id: UUID | str, force: bool = False,
|
||||
effort: str | None = None) -> dict:
|
||||
"""Extract halachot from an uploaded precedent — globally serialized.
|
||||
@@ -483,28 +587,35 @@ async def extract(case_law_id: UUID | str, force: bool = False,
|
||||
case_law_id = UUID(case_law_id)
|
||||
|
||||
pool = await db.get_pool()
|
||||
lock_conn = await pool.acquire()
|
||||
try:
|
||||
got = await lock_conn.fetchval(
|
||||
"SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||
lock_conn = await _acquire_global_lock(pool)
|
||||
if lock_conn is None:
|
||||
logger.warning(
|
||||
"halacha extract: global lock held by a live extraction — "
|
||||
"skipping %s (stays pending for next drain)", case_law_id,
|
||||
)
|
||||
if not got:
|
||||
logger.warning(
|
||||
"halacha extract: global lock held by another extraction — "
|
||||
"skipping %s (stays pending for next drain)", case_law_id,
|
||||
)
|
||||
return {
|
||||
"status": "busy", "extracted": 0, "stored": 0,
|
||||
"case_law_id": str(case_law_id),
|
||||
}
|
||||
return {
|
||||
"status": "busy", "extracted": 0, "stored": 0,
|
||||
"case_law_id": str(case_law_id),
|
||||
}
|
||||
|
||||
stop_keepalive = asyncio.Event()
|
||||
keepalive_task = asyncio.create_task(_lock_keepalive(lock_conn, stop_keepalive))
|
||||
try:
|
||||
return await _extract_impl(case_law_id, force=force, effort=effort)
|
||||
finally:
|
||||
# Stop the keepalive and await it BEFORE reusing lock_conn for unlock —
|
||||
# two coroutines must never query the same asyncpg connection at once.
|
||||
stop_keepalive.set()
|
||||
try:
|
||||
await keepalive_task
|
||||
except Exception: # pragma: no cover — keepalive swallows its own errors
|
||||
logger.warning("halacha lock keepalive task ended abnormally")
|
||||
try:
|
||||
return await _extract_impl(case_law_id, force=force, effort=effort)
|
||||
finally:
|
||||
await lock_conn.fetchval(
|
||||
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||
)
|
||||
finally:
|
||||
await pool.release(lock_conn)
|
||||
finally:
|
||||
await pool.release(lock_conn)
|
||||
|
||||
|
||||
async def _select_extractable_chunks(
|
||||
|
||||
Reference in New Issue
Block a user