From 807053ec54f3bce713a04a50e267c05c37482d2b Mon Sep 17 00:00:00 2001 From: Chaim Date: Sun, 31 May 2026 20:42:15 +0000 Subject: [PATCH] =?UTF-8?q?fix(halacha):=20global=20advisory=20lock=20?= =?UTF-8?q?=E2=80=94=20one=20extraction=20at=20a=20time=20(prevents=20box?= =?UTF-8?q?=20freeze)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2026-05-31: opus-4-8 @ xhigh extraction + overlapping driver processes (agent fallback retries each spawn an independent `python -c` driver; process_pending is serial WITHIN a process but the box ran 4-5 drivers in parallel) → 12-16 concurrent xhigh `claude -p` procs → load 69 → hard reboot. Fix: halacha_extractor.extract() now takes a Postgres advisory lock (pg_try_advisory_lock, key 'HALA') before any work. If another extraction (any process/agent/driver — all share the legal-ai DB) holds it, the call returns status='busy' and the precedent stays pending for the next drain. Guarantees ONE extraction at a time ACROSS PROCESSES — an in-process Semaphore cannot (drivers are separate OS processes). Core logic moved to _extract_impl (unchanged) under the lock. CHUNK_CONCURRENCY now env-tunable (HALACHA_CHUNK_CONCURRENCY, default 3). Verified: while a lock is held, extract() returns 'busy' with no LLM call; lock releases cleanly and the next extraction proceeds. Tracks #72. Co-Authored-By: Claude Opus 4.8 (1M context) --- mcp-server/src/legal_mcp/config.py | 5 ++ .../legal_mcp/services/halacha_extractor.py | 55 +++++++++++++++++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/mcp-server/src/legal_mcp/config.py b/mcp-server/src/legal_mcp/config.py index c930a1f..27afaa7 100644 --- a/mcp-server/src/legal_mcp/config.py +++ b/mcp-server/src/legal_mcp/config.py @@ -54,6 +54,11 @@ REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6380/0") # pinned. HALACHA_EXTRACT_MODEL = os.environ.get("HALACHA_EXTRACT_MODEL", "claude-opus-4-8") HALACHA_EXTRACT_EFFORT = os.environ.get("HALACHA_EXTRACT_EFFORT", "xhigh") +# Concurrent chunks WITHIN a single extraction. Each `claude -p` @ xhigh holds +# ~300MB RSS + heavy CPU; cross-process overlap (agent retries) on top of this +# froze the box on 2026-05-31 (hard reboot). A global advisory lock now caps +# the system to ONE extraction at a time; this caps the chunks within it. +HALACHA_CHUNK_CONCURRENCY = int(os.environ.get("HALACHA_CHUNK_CONCURRENCY", "3")) HALACHA_CORROBORATION_MATCH_FLOOR = float(os.environ.get("HALACHA_CORROBORATION_MATCH_FLOOR", "0.50")) HALACHA_CORROBORATION_MIN_CITES = int(os.environ.get("HALACHA_CORROBORATION_MIN_CITES", "2")) diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index a722fa7..8748066 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -33,7 +33,19 @@ logger = logging.getLogger(__name__) # Concurrency model mirrors claims_extractor — each ``claude -p`` subprocess # holds ~300 MB RSS, so we cap parallel chunks to keep the box healthy. -CHUNK_CONCURRENCY = 3 +# Env-tunable (HALACHA_CHUNK_CONCURRENCY) — see config.py. +CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY + +# Global cross-process serialization key for halacha extraction. Every +# extraction (whichever process/agent/driver launched it) takes a PostgreSQL +# advisory lock on this key first; if another extraction already holds it the +# call returns ``status='busy'`` and the request stays pending for the next +# drain. This makes "one extraction at a time" hold across SEPARATE OS +# processes (agent fallback retries spawn independent `python -c` drivers — an +# in-process Semaphore cannot see them). Root cause of the 2026-05-31 freeze: +# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent +# xhigh `claude -p` procs → load 69 → hard reboot. +_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA' CHUNK_RETRY_ATTEMPTS = 1 # If at least this fraction of chunks crash and the precedent yields zero @@ -331,10 +343,14 @@ async def _extract_chunk( async def extract(case_law_id: UUID | str) -> dict: - """Extract halachot from an uploaded precedent and store them. + """Extract halachot from an uploaded precedent — globally serialized. - Idempotent: replaces any existing halachot for this case_law_id. - All inserted rows start as ``review_status='pending_review'``. + Takes a PostgreSQL advisory lock so only ONE extraction runs at a time + across ALL processes (agent retries + batch ``process_pending`` spawn + independent OS drivers; an in-process Semaphore can't see them). If another + extraction already holds the lock this returns ``status='busy'`` and the + precedent stays pending for the next drain — no second xhigh run piles on + (this is the fix for the 2026-05-31 box freeze). Returns: ``{"status": "...", "extracted": N, "verified": M, "stored": K, ...}`` @@ -342,6 +358,37 @@ async def extract(case_law_id: UUID | str) -> dict: if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) + pool = await db.get_pool() + lock_conn = await pool.acquire() + try: + got = await lock_conn.fetchval( + "SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY, + ) + if not got: + logger.warning( + "halacha extract: global lock held by another extraction — " + "skipping %s (stays pending for next drain)", case_law_id, + ) + return { + "status": "busy", "extracted": 0, "stored": 0, + "case_law_id": str(case_law_id), + } + try: + return await _extract_impl(case_law_id) + finally: + await lock_conn.fetchval( + "SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY, + ) + finally: + await pool.release(lock_conn) + + +async def _extract_impl(case_law_id: UUID) -> dict: + """Core extraction (caller holds the global advisory lock for the duration). + + Idempotent: replaces any existing halachot for this case_law_id. + All inserted rows start as ``review_status='pending_review'``. + """ record = await db.get_case_law(case_law_id) if not record: return {"status": "not_found", "extracted": 0, "stored": 0} -- 2.49.1