"""Drain the halacha extraction queue for the incoming batch. Calls the canonical process_pending_extractions(kind='halacha') ONE case per round until the queue is empty (two consecutive zero-progress rounds). Serial + global advisory-lock coordinated inside the service — avoids concurrent Claude load spikes. NIGHT-WINDOW: halacha extraction is slow (Opus, ~10 min/case) and token-heavy, so the backlog drain runs ONLY in an off-hours window (default 23:00–05:00 Israel time) — it never competes with daytime interactive work or other agents. A tick that starts at 23:00 keeps going until the queue empties OR the window closes (checked before every round); whatever's left resumes the next night (FIFO + per-chunk checkpoint → no lost or duplicated work). Single-case extraction requested by the chair goes through the CEO immediately and is NOT gated here. Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides: HALACHA_DRAIN_WINDOW_START / _END (hours, 0–23) · HALACHA_DRAIN_TZ. USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%, USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor reads). Because it's checked between cases, the in-flight case always finishes on the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429 rather than hammering it. Resumes when the window resets. Env overrides: HALACHA_DRAIN_CEILING_5H / _WEEKLY. Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py """ import asyncio import os import sys from datetime import datetime from zoneinfo import ZoneInfo sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import db from legal_mcp.services import precedent_library as pl from legal_mcp.services import usage_limits _TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem")) _WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23")) _WINDOW_END = int(os.environ.get("HALACHA_DRAIN_WINDOW_END", "5")) def _in_window() -> bool: """True iff the current Israel-time hour is inside [START, END). Handles the midnight wrap (e.g. 23→5): the window is the union of [START,24) and [0,END). If START == END the window is treated as 'always'. """ if _WINDOW_START == _WINDOW_END: return True hour = datetime.now(_TZ).hour if _WINDOW_START < _WINDOW_END: # same-day window, e.g. 1→5 return _WINDOW_START <= hour < _WINDOW_END return hour >= _WINDOW_START or hour < _WINDOW_END # wraps midnight, e.g. 23→5 async def main(): # /operations "disable" switch — no-op immediately if turned off (pm2 # cron_restart can still fire a stopped job, so the gate lives in the DB). if await db.is_drain_disabled("legal-halacha-drain"): print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True) return if not _in_window(): now = datetime.now(_TZ).strftime("%H:%M %Z") print(f"===SKIP=== outside drain window {_WINDOW_START:02d}:00–" f"{_WINDOW_END:02d}:00 (now {now})", flush=True) return total = 0 empty_rounds = 0 rnd = 0 while empty_rounds < 2: # Re-check the /operations kill-switch each round so a drain disabled # mid-run halts at the next round boundary (not only at startup). The UI # toggle + supervisor stop the process outright; this is the in-process # backstop. Per-chunk checkpoints mean the current case loses nothing. if await db.is_drain_disabled("legal-halacha-drain"): print(f"===STOP=== disabled via /operations mid-run — halting " f"({total} cases this run; resumes when re-enabled)", flush=True) break if not _in_window(): print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — " f"{total} cases this run; rest resumes next night", flush=True) break # Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked # BETWEEN cases (limit=1 below), so the in-flight case always finishes on # the remaining quota and we just don't start a new one — stopping BEFORE # a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads # (single source of truth: legal_mcp.services.usage_limits). # FAIL-OPEN on endpoint failure (returns None): keep draining — the # supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to # fail-closed; a throttled endpoint must not halt the night's drain. usage = await asyncio.get_event_loop().run_in_executor( None, usage_limits.subscription_usage) if usage is not None: over, _, detail = usage_limits.ceiling_status(usage) if over: print(f"===STOP=== usage ceiling reached ({detail}) — finished " f"{total} case(s); resumes when quota resets", flush=True) break rnd += 1 # limit=1 → the ceiling (and window/disabled) are re-checked between every # case, so a soft stop never abandons an in-flight extraction. out = await pl.process_pending_extractions(kind="halacha", limit=1) processed = out.get("processed", 0) total_pending = out.get("total_pending", 0) total += processed print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True) for r in out.get("results", []): print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True) if processed == 0: empty_rounds += 1 await asyncio.sleep(5) else: empty_rounds = 0 # Preserve the inter-case spacer that process_pending_extractions # applies WITHIN a batch (only when idx>0) — with limit=1 every batch # is one case, so it never fires there; pace here instead to avoid # 429 bursts. Single source: the service's own cooldown constant. await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC) print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True) if __name__ == "__main__": asyncio.run(main())