Files
legal-ai/scripts/drain_halacha_queue.py
Chaim a44827c3dd
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
fix(operations): disabling the halacha drain now stops a running process immediately
The /operations "disabled" toggle only wrote drain_controls.disabled, which the
drain checks at STARTUP — so a drain already mid-run kept going until the queue
emptied or the night window closed. Disabling did not stop a running drain.

Three layers, immediate + backstops:
- web/app.py operations_drain_toggle: on disable, also stop the running process
  immediately via the host pm2 bridge (_ops_pm2_control). Best-effort — a bridge
  failure doesn't fail the toggle.
- halacha_drain_supervisor.py: each tick now reads the disabled flag (added to
  db_snapshot) and, when set, stops the drain and never re-triggers it —
  regardless of burst/window. Backstop if the UI path failed (≤ one tick).
- drain_halacha_queue.py: re-check is_drain_disabled at the top of every round,
  so a drain disabled mid-run halts at the next round boundary. Per-chunk
  checkpoints mean the in-flight case loses nothing.

SCRIPTS.md updated for both drain and supervisor.

Invariants: G1 (fix at source — the disable control honoured along every path,
not just at startup); G2 (no parallel control path — same drain_controls flag).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 09:03:07 +00:00

95 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') in small batches
until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
NIGHT-WINDOW: halacha extraction is slow (Opus, ~10 min/case) and token-heavy, so
the backlog drain runs ONLY in an off-hours window (default 23:0005:00 Israel
time) — it never competes with daytime interactive work or other agents. A tick
that starts at 23:00 keeps going until the queue empties OR the window closes
(checked before every round); whatever's left resumes the next night (FIFO +
per-chunk checkpoint → no lost or duplicated work). Single-case extraction
requested by the chair goes through the CEO immediately and is NOT gated here.
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
"""
import asyncio
import os
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
_WINDOW_END = int(os.environ.get("HALACHA_DRAIN_WINDOW_END", "5"))
def _in_window() -> bool:
"""True iff the current Israel-time hour is inside [START, END).
Handles the midnight wrap (e.g. 23→5): the window is the union of
[START,24) and [0,END). If START == END the window is treated as 'always'.
"""
if _WINDOW_START == _WINDOW_END:
return True
hour = datetime.now(_TZ).hour
if _WINDOW_START < _WINDOW_END: # same-day window, e.g. 1→5
return _WINDOW_START <= hour < _WINDOW_END
return hour >= _WINDOW_START or hour < _WINDOW_END # wraps midnight, e.g. 23→5
async def main():
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-halacha-drain"):
print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True)
return
if not _in_window():
now = datetime.now(_TZ).strftime("%H:%M %Z")
print(f"===SKIP=== outside drain window {_WINDOW_START:02d}:00"
f"{_WINDOW_END:02d}:00 (now {now})", flush=True)
return
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
# Re-check the /operations kill-switch each round so a drain disabled
# mid-run halts at the next round boundary (not only at startup). The UI
# toggle + supervisor stop the process outright; this is the in-process
# backstop. Per-chunk checkpoints mean the current case loses nothing.
if await db.is_drain_disabled("legal-halacha-drain"):
print(f"===STOP=== disabled via /operations mid-run — halting "
f"({total} cases this run; resumes when re-enabled)", flush=True)
break
if not _in_window():
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
f"{total} cases this run; rest resumes next night", flush=True)
break
rnd += 1
out = await pl.process_pending_extractions(kind="halacha", limit=4)
processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(5)
else:
empty_rounds = 0
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
if __name__ == "__main__":
asyncio.run(main())