Files
legal-ai/scripts/drain_halacha_queue.py
Chaim 4ac661b0a6
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
Lint — undefined names / undefined-names (pull_request) Successful in 11s
feat(halacha): עצירה-רכה של הדריינר בסף-ניצול + מקור-אמת יחיד למכסה
בהגיע סף-רך (5-שעות ≥75% / שבועי ≥65%) הדריינר מסיים את התיק שרץ ועוצר
לבד בין תיקים (===STOP===) — במקום שהסופרוויזר יקטול אותו ב-pm2 stop באמצע
חילוץ. ב-75% יש מכסה לסיים את התיק; הקטילה נשמרת רק ל-429 טרי אמיתי.

- חדש legal_mcp/services/usage_limits.py (stdlib-only): מקור-אמת יחיד —
  subscription_usage / USAGE_CEILINGS / ceiling_status. מיובא גם מ-system-python
  (supervisor) וגם מה-venv (drain). __init__ ריקים → import בטוח מחוץ ל-venv.
- supervisor: מייבא מהמודול (הסרת ההעתקים המקומיים, ~50 שורות פחות);
  quota_exhausted/quota_available הפכו wrappers דקים; ענף cooldown — קטילה
  (hold-stopped) רק אם log_rl (429 טרי), אחרת hold-soft בלי pm2 stop.
- drain: limit=4→1 (בדיקת-סף בין כל תיק); שער-סף ב-run_in_executor, fail-OPEN
  כש-endpoint None (הסופרוויזר מגבה ב-429-kill); שמירת קצב 30ש' בין תיקים
  (pl.INTER_PRECEDENT_COOLDOWN_SEC — limit=1 ביטל את המרווח הפנימי-לסבב).
- SCRIPTS.md עודכן (limit=1, שער-סף, hold-soft, מקור-אמת משותף).

אומת end-to-end (endpoint חי): (1) drain עם סף מורד → ===STOP=== usage ceiling
בלי לעבד תיק; (2) supervisor status=online+סף-רך → action=hold-soft, stop_drain
נקרא 0; (3) 429 טרי → hold-stopped, stop_drain נקרא 1. py_compile עובר.

court_fetch_service/usage_status (העתק שלישי, async/aiohttp, רגיש-דיפלוי) נדחה
במכוון לאיחוד-עתידי — לא נוגעים בגשר-המארח כאן.

Invariants: G1 (נרמול-במקור — endpoint יחיד), G2 (אין מסלול-בקרה מקביל — מודול
משותף יחיד, drain+supervisor קוראים אותו דבר), X16 (עמידות — עצירה לפני 429
מונעת חילוץ-מחדש משחית). G12 לא רלוונטי.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 04:11:09 +00:00

127 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') ONE case per round
until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
NIGHT-WINDOW: halacha extraction is slow (Opus, ~10 min/case) and token-heavy, so
the backlog drain runs ONLY in an off-hours window (default 23:0005:00 Israel
time) — it never competes with daytime interactive work or other agents. A tick
that starts at 23:00 keeps going until the queue empties OR the window closes
(checked before every round); whatever's left resumes the next night (FIFO +
per-chunk checkpoint → no lost or duplicated work). Single-case extraction
requested by the chair goes through the CEO immediately and is NOT gated here.
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ.
USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops
GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%,
USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor
reads). Because it's checked between cases, the in-flight case always finishes on
the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429
rather than hammering it. Resumes when the window resets. Env overrides:
HALACHA_DRAIN_CEILING_5H / _WEEKLY.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
"""
import asyncio
import os
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
from legal_mcp.services import usage_limits
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
_WINDOW_END = int(os.environ.get("HALACHA_DRAIN_WINDOW_END", "5"))
def _in_window() -> bool:
"""True iff the current Israel-time hour is inside [START, END).
Handles the midnight wrap (e.g. 23→5): the window is the union of
[START,24) and [0,END). If START == END the window is treated as 'always'.
"""
if _WINDOW_START == _WINDOW_END:
return True
hour = datetime.now(_TZ).hour
if _WINDOW_START < _WINDOW_END: # same-day window, e.g. 1→5
return _WINDOW_START <= hour < _WINDOW_END
return hour >= _WINDOW_START or hour < _WINDOW_END # wraps midnight, e.g. 23→5
async def main():
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-halacha-drain"):
print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True)
return
if not _in_window():
now = datetime.now(_TZ).strftime("%H:%M %Z")
print(f"===SKIP=== outside drain window {_WINDOW_START:02d}:00"
f"{_WINDOW_END:02d}:00 (now {now})", flush=True)
return
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
# Re-check the /operations kill-switch each round so a drain disabled
# mid-run halts at the next round boundary (not only at startup). The UI
# toggle + supervisor stop the process outright; this is the in-process
# backstop. Per-chunk checkpoints mean the current case loses nothing.
if await db.is_drain_disabled("legal-halacha-drain"):
print(f"===STOP=== disabled via /operations mid-run — halting "
f"({total} cases this run; resumes when re-enabled)", flush=True)
break
if not _in_window():
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
f"{total} cases this run; rest resumes next night", flush=True)
break
# Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked
# BETWEEN cases (limit=1 below), so the in-flight case always finishes on
# the remaining quota and we just don't start a new one — stopping BEFORE
# a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads
# (single source of truth: legal_mcp.services.usage_limits).
# FAIL-OPEN on endpoint failure (returns None): keep draining — the
# supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to
# fail-closed; a throttled endpoint must not halt the night's drain.
usage = await asyncio.get_event_loop().run_in_executor(
None, usage_limits.subscription_usage)
if usage is not None:
over, _, detail = usage_limits.ceiling_status(usage)
if over:
print(f"===STOP=== usage ceiling reached ({detail}) — finished "
f"{total} case(s); resumes when quota resets", flush=True)
break
rnd += 1
# limit=1 → the ceiling (and window/disabled) are re-checked between every
# case, so a soft stop never abandons an in-flight extraction.
out = await pl.process_pending_extractions(kind="halacha", limit=1)
processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(5)
else:
empty_rounds = 0
# Preserve the inter-case spacer that process_pending_extractions
# applies WITHIN a batch (only when idx>0) — with limit=1 every batch
# is one case, so it never fires there; pace here instead to avoid
# 429 bursts. Single source: the service's own cooldown constant.
await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC)
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
if __name__ == "__main__":
asyncio.run(main())