Files
legal-ai/scripts/halacha_drain_supervisor.py
Chaim 49efa94d60
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 4s
fix(halacha): authoritative rate-limit detection + early-morning catch-up window (supervisor)
הדריינר רץ בלילה 13→14.6 אך חילץ 0 הלכות: מכסת-המנוי של claude.ai אזלה
(`api_error_status:429 "You've hit your session limit · resets 2:30am UTC"`,
`total_cost_usd:0`), וה-reset (5:30 IDT) נחת מעט אחרי סגירת החלון (05:00 IDT).
המתזמר סיווג זאת שגוי כ-"hung" ועשה restart-storm כל 15 דק' — כי `scan_rate_limit`
קורא רק 120 שורות-זנב, וה-429 (שורה 8273/9170) נקבר תחת ~900 שורות teardown שה-churn
שלו עצמו ייצר. בנוסף "hold" לא עצר את הדריינר → המשך הלמת-429 ובזבוז המכסה.

Fix A — זיהוי rate-limit עמיד:
  • `quota_exhausted()` חדש: מקור-האמת הוא endpoint-המכסה (`subscription_usage`,
    אותו util שה-UI מציג) — durable, לא תלוי בעומק-זנב-הלוג. log-scrape רק כ-fallback.
  • בזמן מוגבל עוצר דריינר online (`hold-stopped`) כדי לא להלום 429; מצית-מחדש
    כשהמכסה חוזרת (exit מיידי כש-endpoint <100%, או probe `claude -p` אם endpoint למטה).

Fix B — חלון catch-up בוקר [05:00–07:00 IDT):
  • נפתח רק לניקוי backlog שנותר כשהמכסה חזרה (מגודר: לא-מוגבל + תור≠ריק) כדי
    שהמכסה המשוחררת לא תתבזבז עד הלילה הבא. הקצה המורחב מועבר לדריינר (window self-guard).

נתונים בטוחים — תיקים נשארו 'processing' for retry, שום הלכה לא אבדה.
13 unit-tests עוברים (parse endpoint, gating של catch-band, win extension); `status` חי OK.

Invariants: מקיים G1 (תיקון-במקור: זיהוי ממקור-מכסה סמכותי, לא מתסמין-לוג),
G2 (אותו endpoint+מנגנון-חלון קיימים — בלי מסלול מקביל), INV-G3/X16 (לא נוגע
ב-checkpointing הדטרמיניסטי). G12 לא רלוונטי (host-side pm2, בלי Paperclip).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 10:02:35 +00:00

642 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
Subcommands (argparse):
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
burst-on start a daytime BURST — drain runs continuously until the upcoming
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
burst-off stop the burst now (revert to night mode) and halt the running drain
status print current mode + queue + drain state (no action)
BURST is the chair's manual "run continuously now" window. Source of truth is the
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
the /operations page reads/writes (G1, single source; G2, no parallel control
path). burst-on/off here and the /operations toggle are equivalent front-ends.
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
log tails →
• re-triggers the one-shot drain when idle and the queue is non-empty
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
REAL liveness signal; the out-log only updates when a whole CASE finishes)
• backs off on rate-limit until quota resets — PRIMARY signal is the authoritative
OAuth usage endpoint (durable; the same util the Claude Code UI shows), with the
log 429 only as a fallback when that endpoint is unreachable. While limited it
STOPS the drain (no 429-hammering) and re-ignites once quota is back.
• verifies crash-safe per-chunk staging is committing (nothing lost)
The night window is 23:0005:00 IDT, with a bounded early-morning CATCH-UP band
[05:0007:00 IDT) that opens ONLY to drain a leftover backlog once quota is back —
a late 5-hour reset commonly lands just past 05:00, and this keeps the freed quota
from idling until the next night (gated on quota + non-empty queue).
TWO MODES (never self-stops):
• burst — while burst_until is set and in the future: window LIFTED
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
• night — otherwise: the drain's normal 23:0005:00 IDT window; the supervisor
stays on as the permanent nightly-drain health manager.
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
never bleeds into a fresh week's quota uninvited.
"""
import argparse
import json
import os
import re
import subprocess
import urllib.request
from datetime import datetime, timedelta, timezone
from glob import glob
REPO = "/home/chaim/legal-ai"
# claude.ai subscription usage — the same 5-hour / 7-day utilization the Claude
# Code status bar shows, via the (undocumented) OAuth usage endpoint. The token
# lives in the CLI's own credentials file; the claude-code User-Agent is
# REQUIRED — without it the request lands in an aggressively rate-limited bucket
# and 429s. Unofficial endpoint: may change, so callers must tolerate None.
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
_USAGE_UA = "claude-code/2.1.177"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain"
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
CATCHUP_END = 7 # soft window end (IDT) for early-morning catch-up — see fix B
def _now_utc():
return datetime.now(timezone.utc)
def pm2_bin():
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
try:
subprocess.run([c, "-v"], capture_output=True, timeout=10)
return c
except Exception:
continue
return "pm2"
def claude_bin():
"""Resolve the claude CLI — PATH may be bare under pm2 cron, so fall back to
the known install location (same binary the drain's claude_session uses)."""
for c in ["claude", "/home/chaim/.local/bin/claude",
*glob("/home/chaim/.nvm/versions/node/*/bin/claude")]:
try:
if subprocess.run([c, "--version"], capture_output=True,
timeout=10).returncode == 0:
return c
except Exception:
continue
return "claude"
PM2 = pm2_bin()
CLAUDE = claude_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"}
def subscription_usage() -> dict | None:
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
utilization the Claude Code UI shows — from the OAuth usage endpoint.
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
or None on ANY failure. Undocumented endpoint — every caller must tolerate
None and fall back."""
try:
with open(CLAUDE_CRED_PATH) as f:
token = json.load(f)["claudeAiOauth"]["accessToken"]
except Exception:
return None
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
"Authorization": f"Bearer {token}",
"User-Agent": _USAGE_UA, # required — else aggressive 429
"anthropic-beta": "oauth-2025-04-20",
})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
def quota_available() -> bool:
"""Is the claude.ai quota actually usable right now?
Primary: read the authoritative utilization from the OAuth usage endpoint
(subscription_usage) and treat a window as exhausted only at >=100%. Cheaper
and more precise than a probe — no Opus call, and it sees every limit
(5-hour, weekly all-models, weekly-Sonnet) the way the UI does. The 429 reset
time claude.ai reports is often conservative, so this resumes the drain the
moment a window actually frees up rather than waiting blindly.
Fallback (endpoint unreachable — it is undocumented): a tiny `claude -p`
probe via the official CLI. Conservative on failure: any non-zero exit,
timeout, or limit message → treat as still limited."""
usage = subscription_usage()
if usage is not None:
# A drain run needs the 5-hour window, the weekly all-models cap, AND
# the weekly per-model cap all below 100%. On this account the per-model
# cap that's actually populated is Sonnet (seven_day_opus is null — no
# separate Opus cap); the all-models seven_day cap is the backstop for
# Opus usage either way. null utilization → treated as 0% (not limiting).
windows = ("five_hour", "seven_day", "seven_day_sonnet")
utils = [(usage.get(w) or {}).get("utilization") for w in windows]
# utilization may be None (window inactive / no data) → treat as 0%.
return all((u or 0) < 100 for u in utils)
# ── fallback: official-CLI probe ──
try:
r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"],
capture_output=True, text=True, timeout=60, env=_ENV,
cwd=REPO)
except Exception:
return False
if r.returncode != 0:
return False
out = ((r.stdout or "") + (r.stderr or ""))
low = out.lower()
if "usage limit" in low or "session limit" in low or 'api_error_status":429' in out:
return False
return "OK" in out
def quota_exhausted():
"""Authoritative rate-limit signal from the OAuth usage endpoint — the SAME
source the Claude Code UI (and quota_available) reads. Durable, unlike
scan_rate_limit's log tail, which loses the 429 the moment it scrolls out from
under the supervisor's own restart-churn (the bug that let a 5-hour exhaustion
masquerade as 'hung' all night).
Returns (exhausted: bool, earliest_reset_utc: datetime|None), or None when the
endpoint is unreachable (caller falls back to the log scrape). A window counts
as exhausting the drain at >=100% utilization — same windows quota_available
gates on (5-hour, weekly all-models, weekly-Sonnet)."""
usage = subscription_usage()
if usage is None:
return None
exhausted, resets = False, []
for w in ("five_hour", "seven_day", "seven_day_sonnet"):
info = usage.get(w) or {}
if (info.get("utilization") or 0) >= 100:
exhausted = True
r = info.get("resets_at")
if r:
try:
resets.append(datetime.fromisoformat(r).astimezone(timezone.utc))
except Exception:
pass
return exhausted, (min(resets) if resets else None)
# ── DB access (via the repo venv; the module self-configures) ────────────────
def _venv_py(code: str, timeout: int = 120) -> str:
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
cwd=REPO, timeout=timeout, env=_ENV)
if r.returncode != 0:
raise RuntimeError((r.stderr or r.stdout)[:300])
return r.stdout.strip()
def db_snapshot() -> dict:
"""Queue + staging counts + burst_until, in one round-trip."""
code = (
"import sys,os,asyncio,json\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" dis=await c.fetchval(\"SELECT disabled FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None,'disabled':bool(dis)}))\n"
"asyncio.run(m())\n"
)
return json.loads(_venv_py(code).splitlines()[-1])
_SET_BURST_SQL = (
"INSERT INTO drain_controls (name, burst_until, updated_at) "
"VALUES ('legal-halacha-drain', $1, now()) "
"ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()"
)
def db_set_burst(until_iso):
# Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag
# main, so the supervisor depends only on get_pool + the burst_until column —
# the column is the shared contract with the container-side /operations API.
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from datetime import datetime\n"
"from legal_mcp.services import db\n"
f"v={until_iso!r}\n"
f"SQL={_SET_BURST_SQL!r}\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n"
"asyncio.run(m())\n"
)
_venv_py(code, timeout=60)
def db_get_burst():
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(v.isoformat() if v else '')\n"
"asyncio.run(m())\n"
)
out = _venv_py(code, timeout=60).strip()
return datetime.fromisoformat(out) if out else None
def next_saturday_18_utc():
now_idt = _now_utc().astimezone(IDT)
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
if cand <= now_idt:
cand += timedelta(days=7)
return cand.astimezone(timezone.utc)
# ── state ────────────────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE) as f:
return json.load(f)
except Exception:
return {}
def save_state(s):
os.makedirs(RUNTIME_DIR, exist_ok=True)
tmp = STATE + ".tmp"
with open(tmp, "w") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE)
# ── pm2 / logs ────────────────────────────────────────────────────────────────
def pm2_status():
try:
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
for p in json.loads(r.stdout):
if p.get("name") == DRAIN:
return p.get("pm2_env", {}).get("status", "unknown")
except Exception as e:
return f"err:{e}"
return "absent"
def log_age_sec():
try:
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
except Exception:
return None
def tail(path, n=120):
try:
with open(path, errors="replace") as f:
return f.readlines()[-n:]
except Exception:
return []
def scan_rate_limit():
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
LAST drain outcome (after the last 'completed' line)."""
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
last_429, last_ok, reset_dt = -1, -1, None
for i, ln in enumerate(lines):
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
last_429 = i
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
if m:
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
if ap == "pm" and hh != 12:
hh += 12
if ap == "am" and hh == 12:
hh = 0
now = _now_utc()
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if cand <= now - timedelta(minutes=1):
cand += timedelta(days=1)
reset_dt = cand
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
last_ok = i
return (last_429 > last_ok and last_429 >= 0), reset_dt
# ── actions ───────────────────────────────────────────────────────────────────
def trigger_drain(win_start, win_end):
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
capture_output=True, text=True, env=env, timeout=60)
return r.returncode == 0, (r.stderr or r.stdout)[:200]
def stop_drain():
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
return r.returncode == 0
# ── the tick ──────────────────────────────────────────────────────────────────
def tick():
now = _now_utc()
prev = load_state()
notes = []
try:
snap = db_snapshot()
except Exception as e:
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
print("⚠️ כשל בקריאת ה-DB:", e)
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
return
# /operations "disabled" switch — highest priority. The drain self-guards at
# startup, but a process mid-run wouldn't notice the flag, and the cron keeps
# firing it; so when disabled we stop any running drain and never re-trigger,
# regardless of burst/window. (The UI toggle also stops it immediately via the
# bridge; this is the backstop + the "don't re-ignite a disabled drain" gate.)
if snap.get("disabled"):
stopped = stop_drain()
save_state({**prev, "tick_at": now.isoformat(), "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped"})
print(f"🛑 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: disabled | "
f"מסומן לא-פעיל ב-/operations — הדריינר {'נעצר' if stopped else 'כבר עצור'}.")
print("JSON:" + json.dumps(
{"ok": True, "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped",
"next_wake_sec": 900}, ensure_ascii=False))
return
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
if burst_until and now >= burst_until:
try:
db_set_burst(None)
except Exception as e:
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
stop_drain()
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
burst_until = None
burst = bool(burst_until and now < burst_until)
idt_hour = now.astimezone(IDT).hour
core_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
in_catchup_band = (NIGHT_END <= idt_hour < CATCHUP_END) # [05:00,07:00) IDT
phase = "burst" if burst else "night"
# window_open / win are finalized AFTER the cooldown + backlog are known — the
# early-morning catch-up band (fix B) is gated on quota being back and a
# leftover backlog, so it can't be decided here yet.
sc = snap["status_counts"]
pending = int(sc.get("pending", 0))
processing = int(sc.get("processing", 0))
done = int(sc.get("completed", 0))
failed = int(sc.get("failed", 0))
hal = snap["halachot_total"]
ck = snap["checkpointed_chunks"]
proc_cases = snap["processing_cases"]
status = pm2_status()
age = log_age_sec()
rl_recent, reset_dt = scan_rate_limit()
d_hal = hal - prev.get("halachot_total", hal)
d_ck = ck - prev.get("checkpointed_chunks", ck)
d_done = done - prev.get("done", done)
# cooldown — the authoritative OAuth usage endpoint is PRIMARY (durable); the
# log scrape is the fallback only when the endpoint is unreachable. This is the
# core of fix A: log-scraping alone went blind once a 429 scrolled out from
# under the supervisor's own restart-churn, so an exhausted 5-hour window read
# as 'hung' and got hammered with restarts. The endpoint can't scroll away.
fresh = (age is not None and age < 1800)
log_rl = bool(rl_recent and fresh)
auth = quota_exhausted() # (exhausted, reset_utc) | None if endpoint down
auth_says_ok = (auth is not None and not auth[0])
cd_dt = None
if auth is not None and auth[0]: # authoritative: a window is exhausted
cd_dt = auth[1] or reset_dt # prefer endpoint reset; fall back to parsed
elif log_rl and not auth_says_ok: # endpoint down/silent → trust a fresh 429
cd_dt = reset_dt
if cd_dt is None and prev.get("cooldown_until"): # persist a stored future reset
try:
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
except Exception:
cd_dt = None
in_cooldown = bool(cd_dt and now < cd_dt)
# Exit cooldown the instant quota is actually back — claude.ai usually frees up
# EARLIER than the reported reset. Authoritative all-clear is decisive; when the
# endpoint is down, fall back to the tiny live CLI probe (old behavior).
if in_cooldown and (auth_says_ok or (auth is None and quota_available())):
notes.append(
f"בדיקת-מכסה: המכסה זמינה — מתחדש מיד "
f"(לפני האיפוס המדווח {cd_dt.astimezone(IDT):%H:%M IDT}).")
cd_dt, in_cooldown = None, False
cooldown_until = cd_dt.isoformat() if cd_dt else None
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
# fix B — early-morning catch-up window. The fixed night window ends 05:00 IDT,
# but the claude.ai 5-hour quota reset drifts and on a rate-limited night
# commonly lands just after it; without this the freed quota idles until the
# next night. Open a bounded catch-up band [05:0007:00 IDT) to clear a LEFTOVER
# backlog once quota is back. Gated on (not in_cooldown) → never opens while
# still limited; gated on a non-empty queue → a clean early morning never drags
# the drain into the day. The extended end is also passed to the drain so its
# own window self-guard accepts the catch-up rounds.
backlog = (pending > 0 or processing > 0)
catchup = bool(in_catchup_band and backlog and not in_cooldown)
window_open = burst or core_open or catchup
win = (0, 0) if burst else (NIGHT_START, CATCHUP_END if catchup else NIGHT_END)
if catchup and not core_open:
notes.append(f"catch-up בוקר ({NIGHT_END}:000{CATCHUP_END}:00) — מנצל מכסה "
f"משוחררת לסיום {pending} בתור (איפוס מאוחר אחרי החלון).")
# progress-based liveness (chunk checkpoints, NOT log mtime)
if ck > prev.get("checkpointed_chunks", ck):
last_progress_at = now
else:
lp = prev.get("last_progress_at")
try:
last_progress_at = datetime.fromisoformat(lp) if lp else now
except Exception:
last_progress_at = now
progress_age = (now - last_progress_at).total_seconds()
hung = (status == "online" and prev.get("last_progress_at") is not None
and progress_age > STUCK_SILENCE_SEC)
# ── decide ONE action ──
action, detail, mode = "none", "", "running"
if pending == 0 and processing == 0:
mode = "done"
notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited"
# Stop a running drain while limited — otherwise it keeps spawning Opus
# calls that 429 on every chunk, burning the very quota we're waiting on
# (and burying the 429 signal under teardown noise). It re-ignites via the
# normal trigger path once cooldown clears.
if status == "online":
stop_drain()
action = "hold-stopped"
notes.append(f"rate-limit פעיל — הדריינר נעצר כדי לא לבזבז מכסה על 429; "
f"איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
else:
action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif hung:
mode = "hung"
ok, detail = trigger_drain(*win)
action = "restart-hung" if ok else "restart-hung-failed"
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
elif status == "online":
mode = "running"
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
elif not window_open:
mode = "idle_off_window"
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:000{NIGHT_END}:00) — ממתין; {pending} בתור.")
else:
ok, detail = trigger_drain(*win)
action = "triggered" if ok else "trigger-failed"
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
staging_ok = not (d_done > 0 and d_ck == 0)
if not staging_ok:
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
elif mode in ("done", "idle_off_window"):
next_wake = 1800
else:
next_wake = 900
save_state({
"tick_at": now.isoformat(), "phase": phase,
"burst_until": burst_until.isoformat() if burst_until else None,
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
"pending": pending, "processing": processing, "processing_cases": proc_cases,
"last_progress_at": last_progress_at.isoformat(),
"cooldown_until": cooldown_until if in_cooldown else None,
"mode": mode, "action": action,
})
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
if prev.get("tick_at") else "")
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
if proc_cases:
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
for n in notes:
print(f"{n}")
print("JSON:" + json.dumps({
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
"burst_until": burst_until.isoformat() if burst_until else None,
"pending": pending, "processing": processing, "done": done, "failed": failed,
"halachot_total": hal, "checkpointed_chunks": ck,
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
"pm2": status, "progress_age_sec": int(progress_age),
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
}, ensure_ascii=False))
def cmd_status():
bu = db_get_burst()
now = _now_utc()
burst = bool(bu and now < bu)
st = load_state()
print(f"מצב: {'BURST' if burst else 'night'}"
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:0005:00)"))
if st:
print(f"טיק אחרון: {st.get('tick_at','')} | mode={st.get('mode')} action={st.get('action')}")
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
print(f"דריינר pm2: {pm2_status()}")
usage = subscription_usage()
if usage:
def _w(key):
w = usage.get(key) or {}
u = w.get("utilization")
if u is None:
return ""
r = w.get("resets_at")
try:
rt = f" (איפוס {datetime.fromisoformat(r).astimezone(IDT):%H:%M}" if r else ""
rt += ")" if r else ""
except Exception:
rt = ""
return f"{u:.0f}%{rt}"
print(f"מכסת claude.ai: 5-שעות={_w('five_hour')} · שבועי={_w('seven_day')} · "
f"שבועי-Sonnet={_w('seven_day_sonnet')}")
else:
print("מכסת claude.ai: (endpoint לא זמין)")
def main():
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
ap.add_argument("cmd", nargs="?", default="tick",
choices=["tick", "burst-on", "burst-off", "status"])
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
a = ap.parse_args()
if a.cmd == "burst-on":
if a.until:
try:
until = datetime.fromisoformat(a.until)
except ValueError:
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
if until.tzinfo is None:
until = until.replace(tzinfo=IDT)
else:
until = next_saturday_18_utc()
db_set_burst(until.isoformat())
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
tick() # kick off immediately
elif a.cmd == "burst-off":
db_set_burst(None)
stopped = stop_drain()
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
f"חזרה למצב לילה (23:0005:00).")
tick()
elif a.cmd == "status":
cmd_status()
else:
tick()
if __name__ == "__main__":
main()