"""Drain the halacha extraction queue for the incoming batch. Calls the canonical process_pending_extractions(kind='halacha') in small batches until the queue is empty (two consecutive zero-progress rounds). Serial + global advisory-lock coordinated inside the service — avoids concurrent Claude load spikes. Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py """ import asyncio import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import db from legal_mcp.services import precedent_library as pl async def main(): # /operations "disable" switch — no-op immediately if turned off (pm2 # cron_restart can still fire a stopped job, so the gate lives in the DB). if await db.is_drain_disabled("legal-halacha-drain"): print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True) return total = 0 empty_rounds = 0 rnd = 0 while empty_rounds < 2: rnd += 1 out = await pl.process_pending_extractions(kind="halacha", limit=4) processed = out.get("processed", 0) total_pending = out.get("total_pending", 0) total += processed print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True) for r in out.get("results", []): print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True) if processed == 0: empty_rounds += 1 await asyncio.sleep(5) else: empty_rounds = 0 print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True) if __name__ == "__main__": asyncio.run(main())