Merge pull request 'fix(halacha): נעילה גלובלית — חילוץ אחד בכל רגע (מונע הקפאת מכונה)' (#30) from fix/halacha-extract-global-lock into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m35s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m35s
This commit was merged in pull request #30.
This commit is contained in:
@@ -54,6 +54,11 @@ REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6380/0")
|
||||
# pinned.
|
||||
HALACHA_EXTRACT_MODEL = os.environ.get("HALACHA_EXTRACT_MODEL", "claude-opus-4-8")
|
||||
HALACHA_EXTRACT_EFFORT = os.environ.get("HALACHA_EXTRACT_EFFORT", "xhigh")
|
||||
# Concurrent chunks WITHIN a single extraction. Each `claude -p` @ xhigh holds
|
||||
# ~300MB RSS + heavy CPU; cross-process overlap (agent retries) on top of this
|
||||
# froze the box on 2026-05-31 (hard reboot). A global advisory lock now caps
|
||||
# the system to ONE extraction at a time; this caps the chunks within it.
|
||||
HALACHA_CHUNK_CONCURRENCY = int(os.environ.get("HALACHA_CHUNK_CONCURRENCY", "3"))
|
||||
HALACHA_CORROBORATION_MATCH_FLOOR = float(os.environ.get("HALACHA_CORROBORATION_MATCH_FLOOR", "0.50"))
|
||||
HALACHA_CORROBORATION_MIN_CITES = int(os.environ.get("HALACHA_CORROBORATION_MIN_CITES", "2"))
|
||||
|
||||
|
||||
@@ -33,7 +33,19 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Concurrency model mirrors claims_extractor — each ``claude -p`` subprocess
|
||||
# holds ~300 MB RSS, so we cap parallel chunks to keep the box healthy.
|
||||
CHUNK_CONCURRENCY = 3
|
||||
# Env-tunable (HALACHA_CHUNK_CONCURRENCY) — see config.py.
|
||||
CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY
|
||||
|
||||
# Global cross-process serialization key for halacha extraction. Every
|
||||
# extraction (whichever process/agent/driver launched it) takes a PostgreSQL
|
||||
# advisory lock on this key first; if another extraction already holds it the
|
||||
# call returns ``status='busy'`` and the request stays pending for the next
|
||||
# drain. This makes "one extraction at a time" hold across SEPARATE OS
|
||||
# processes (agent fallback retries spawn independent `python -c` drivers — an
|
||||
# in-process Semaphore cannot see them). Root cause of the 2026-05-31 freeze:
|
||||
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
|
||||
# xhigh `claude -p` procs → load 69 → hard reboot.
|
||||
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
|
||||
CHUNK_RETRY_ATTEMPTS = 1
|
||||
|
||||
# If at least this fraction of chunks crash and the precedent yields zero
|
||||
@@ -331,10 +343,14 @@ async def _extract_chunk(
|
||||
|
||||
|
||||
async def extract(case_law_id: UUID | str) -> dict:
|
||||
"""Extract halachot from an uploaded precedent and store them.
|
||||
"""Extract halachot from an uploaded precedent — globally serialized.
|
||||
|
||||
Idempotent: replaces any existing halachot for this case_law_id.
|
||||
All inserted rows start as ``review_status='pending_review'``.
|
||||
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
|
||||
across ALL processes (agent retries + batch ``process_pending`` spawn
|
||||
independent OS drivers; an in-process Semaphore can't see them). If another
|
||||
extraction already holds the lock this returns ``status='busy'`` and the
|
||||
precedent stays pending for the next drain — no second xhigh run piles on
|
||||
(this is the fix for the 2026-05-31 box freeze).
|
||||
|
||||
Returns:
|
||||
``{"status": "...", "extracted": N, "verified": M, "stored": K, ...}``
|
||||
@@ -342,6 +358,37 @@ async def extract(case_law_id: UUID | str) -> dict:
|
||||
if isinstance(case_law_id, str):
|
||||
case_law_id = UUID(case_law_id)
|
||||
|
||||
pool = await db.get_pool()
|
||||
lock_conn = await pool.acquire()
|
||||
try:
|
||||
got = await lock_conn.fetchval(
|
||||
"SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||
)
|
||||
if not got:
|
||||
logger.warning(
|
||||
"halacha extract: global lock held by another extraction — "
|
||||
"skipping %s (stays pending for next drain)", case_law_id,
|
||||
)
|
||||
return {
|
||||
"status": "busy", "extracted": 0, "stored": 0,
|
||||
"case_law_id": str(case_law_id),
|
||||
}
|
||||
try:
|
||||
return await _extract_impl(case_law_id)
|
||||
finally:
|
||||
await lock_conn.fetchval(
|
||||
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||
)
|
||||
finally:
|
||||
await pool.release(lock_conn)
|
||||
|
||||
|
||||
async def _extract_impl(case_law_id: UUID) -> dict:
|
||||
"""Core extraction (caller holds the global advisory lock for the duration).
|
||||
|
||||
Idempotent: replaces any existing halachot for this case_law_id.
|
||||
All inserted rows start as ``review_status='pending_review'``.
|
||||
"""
|
||||
record = await db.get_case_law(case_law_id)
|
||||
if not record:
|
||||
return {"status": "not_found", "extracted": 0, "stored": 0}
|
||||
|
||||
Reference in New Issue
Block a user