Files
legal-ai/scripts/drain_halacha_queue.py
Chaim 07ecb6a366
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m29s
G12 Leak-Guard / leak-guard (push) Successful in 5s
Lint — undefined names / undefined-names (push) Successful in 11s
feat(halacha): עצירה-רכה של הדריינר בסף-ניצול (75/65) + מקור-אמת יחיד למכסה (#265)
Co-authored-by: Chaim <chaim@marcus-law.co.il>
Co-committed-by: Chaim <chaim@marcus-law.co.il>
2026-06-15 04:11:43 +00:00

127 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') ONE case per round
until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
NIGHT-WINDOW: halacha extraction is slow (Opus, ~10 min/case) and token-heavy, so
the backlog drain runs ONLY in an off-hours window (default 23:0005:00 Israel
time) — it never competes with daytime interactive work or other agents. A tick
that starts at 23:00 keeps going until the queue empties OR the window closes
(checked before every round); whatever's left resumes the next night (FIFO +
per-chunk checkpoint → no lost or duplicated work). Single-case extraction
requested by the chair goes through the CEO immediately and is NOT gated here.
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ.
USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops
GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%,
USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor
reads). Because it's checked between cases, the in-flight case always finishes on
the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429
rather than hammering it. Resumes when the window resets. Env overrides:
HALACHA_DRAIN_CEILING_5H / _WEEKLY.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
"""
import asyncio
import os
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
from legal_mcp.services import usage_limits
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
_WINDOW_END = int(os.environ.get("HALACHA_DRAIN_WINDOW_END", "5"))
def _in_window() -> bool:
"""True iff the current Israel-time hour is inside [START, END).
Handles the midnight wrap (e.g. 23→5): the window is the union of
[START,24) and [0,END). If START == END the window is treated as 'always'.
"""
if _WINDOW_START == _WINDOW_END:
return True
hour = datetime.now(_TZ).hour
if _WINDOW_START < _WINDOW_END: # same-day window, e.g. 1→5
return _WINDOW_START <= hour < _WINDOW_END
return hour >= _WINDOW_START or hour < _WINDOW_END # wraps midnight, e.g. 23→5
async def main():
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-halacha-drain"):
print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True)
return
if not _in_window():
now = datetime.now(_TZ).strftime("%H:%M %Z")
print(f"===SKIP=== outside drain window {_WINDOW_START:02d}:00"
f"{_WINDOW_END:02d}:00 (now {now})", flush=True)
return
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
# Re-check the /operations kill-switch each round so a drain disabled
# mid-run halts at the next round boundary (not only at startup). The UI
# toggle + supervisor stop the process outright; this is the in-process
# backstop. Per-chunk checkpoints mean the current case loses nothing.
if await db.is_drain_disabled("legal-halacha-drain"):
print(f"===STOP=== disabled via /operations mid-run — halting "
f"({total} cases this run; resumes when re-enabled)", flush=True)
break
if not _in_window():
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
f"{total} cases this run; rest resumes next night", flush=True)
break
# Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked
# BETWEEN cases (limit=1 below), so the in-flight case always finishes on
# the remaining quota and we just don't start a new one — stopping BEFORE
# a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads
# (single source of truth: legal_mcp.services.usage_limits).
# FAIL-OPEN on endpoint failure (returns None): keep draining — the
# supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to
# fail-closed; a throttled endpoint must not halt the night's drain.
usage = await asyncio.get_event_loop().run_in_executor(
None, usage_limits.subscription_usage)
if usage is not None:
over, _, detail = usage_limits.ceiling_status(usage)
if over:
print(f"===STOP=== usage ceiling reached ({detail}) — finished "
f"{total} case(s); resumes when quota resets", flush=True)
break
rnd += 1
# limit=1 → the ceiling (and window/disabled) are re-checked between every
# case, so a soft stop never abandons an in-flight extraction.
out = await pl.process_pending_extractions(kind="halacha", limit=1)
processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(5)
else:
empty_rounds = 0
# Preserve the inter-case spacer that process_pending_extractions
# applies WITHIN a batch (only when idx>0) — with limit=1 every batch
# is one case, so it never fires there; pace here instead to avoid
# 429 bursts. Single source: the service's own cooldown constant.
await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC)
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
if __name__ == "__main__":
asyncio.run(main())