fix(halacha): global advisory lock — one extraction at a time (prevents box freeze)

2026-05-31: opus-4-8 @ xhigh extraction + overlapping driver processes (agent
fallback retries each spawn an independent `python -c` driver; process_pending is
serial WITHIN a process but the box ran 4-5 drivers in parallel) → 12-16 concurrent
xhigh `claude -p` procs → load 69 → hard reboot.

Fix: halacha_extractor.extract() now takes a Postgres advisory lock
(pg_try_advisory_lock, key 'HALA') before any work. If another extraction (any
process/agent/driver — all share the legal-ai DB) holds it, the call returns
status='busy' and the precedent stays pending for the next drain. Guarantees ONE
extraction at a time ACROSS PROCESSES — an in-process Semaphore cannot (drivers
are separate OS processes). Core logic moved to _extract_impl (unchanged) under
the lock. CHUNK_CONCURRENCY now env-tunable (HALACHA_CHUNK_CONCURRENCY, default 3).

Verified: while a lock is held, extract() returns 'busy' with no LLM call; lock
releases cleanly and the next extraction proceeds. Tracks #72.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 20:42:15 +00:00
parent 62e5e5183d
commit 807053ec54
2 changed files with 56 additions and 4 deletions

View File

@@ -33,7 +33,19 @@ logger = logging.getLogger(__name__)
# Concurrency model mirrors claims_extractor — each ``claude -p`` subprocess
# holds ~300 MB RSS, so we cap parallel chunks to keep the box healthy.
CHUNK_CONCURRENCY = 3
# Env-tunable (HALACHA_CHUNK_CONCURRENCY) — see config.py.
CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY
# Global cross-process serialization key for halacha extraction. Every
# extraction (whichever process/agent/driver launched it) takes a PostgreSQL
# advisory lock on this key first; if another extraction already holds it the
# call returns ``status='busy'`` and the request stays pending for the next
# drain. This makes "one extraction at a time" hold across SEPARATE OS
# processes (agent fallback retries spawn independent `python -c` drivers — an
# in-process Semaphore cannot see them). Root cause of the 2026-05-31 freeze:
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
# xhigh `claude -p` procs → load 69 → hard reboot.
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
CHUNK_RETRY_ATTEMPTS = 1
# If at least this fraction of chunks crash and the precedent yields zero
@@ -331,10 +343,14 @@ async def _extract_chunk(
async def extract(case_law_id: UUID | str) -> dict:
"""Extract halachot from an uploaded precedent and store them.
"""Extract halachot from an uploaded precedent — globally serialized.
Idempotent: replaces any existing halachot for this case_law_id.
All inserted rows start as ``review_status='pending_review'``.
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
across ALL processes (agent retries + batch ``process_pending`` spawn
independent OS drivers; an in-process Semaphore can't see them). If another
extraction already holds the lock this returns ``status='busy'`` and the
precedent stays pending for the next drain — no second xhigh run piles on
(this is the fix for the 2026-05-31 box freeze).
Returns:
``{"status": "...", "extracted": N, "verified": M, "stored": K, ...}``
@@ -342,6 +358,37 @@ async def extract(case_law_id: UUID | str) -> dict:
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
pool = await db.get_pool()
lock_conn = await pool.acquire()
try:
got = await lock_conn.fetchval(
"SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY,
)
if not got:
logger.warning(
"halacha extract: global lock held by another extraction — "
"skipping %s (stays pending for next drain)", case_law_id,
)
return {
"status": "busy", "extracted": 0, "stored": 0,
"case_law_id": str(case_law_id),
}
try:
return await _extract_impl(case_law_id)
finally:
await lock_conn.fetchval(
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
)
finally:
await pool.release(lock_conn)
async def _extract_impl(case_law_id: UUID) -> dict:
"""Core extraction (caller holds the global advisory lock for the duration).
Idempotent: replaces any existing halachot for this case_law_id.
All inserted rows start as ``review_status='pending_review'``.
"""
record = await db.get_case_law(case_law_id)
if not record:
return {"status": "not_found", "extracted": 0, "stored": 0}