Files
legal-ai/scripts/halacha_drain_supervisor.py
Chaim 013fe39ea7
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 8s
fix(supervisor): re-probe claude.ai quota instead of waiting blindly for the reported reset
When the halacha drain hit a 429, the supervisor recorded the reset time the
error reported (e.g. "resets 6:50pm UTC") and then HELD until that timestamp,
re-reading it from its own state every tick without ever checking whether quota
had actually returned. claude.ai usually frees up quota earlier than the message
claims, so the drain sat idle for hours after it could have resumed — and only a
manual kick (clear cooldown + trigger) got it going again.

Now, on any tick where we'd otherwise hold on a cooldown, run a cheap live probe
(`quota_available()` → a tiny `claude -p` call, cost ~0) and resume the instant
it succeeds — at most one probe per 15-min tick, only while we believe we're
limited. Conservative on failure (non-zero exit / timeout / limit message →
stay held), so a flaky probe never resumes the drain into a real 429. Adds a
claude_bin() resolver so the probe works under pm2 cron where PATH is bare.

Invariants: G1 (resume decision driven by actual quota state, not a guessed
timestamp); no new control path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 09:35:45 +00:00

505 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
Subcommands (argparse):
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
burst-on start a daytime BURST — drain runs continuously until the upcoming
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
burst-off stop the burst now (revert to night mode) and halt the running drain
status print current mode + queue + drain state (no action)
BURST is the chair's manual "run continuously now" window. Source of truth is the
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
the /operations page reads/writes (G1, single source; G2, no parallel control
path). burst-on/off here and the /operations toggle are equivalent front-ends.
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
log tails →
• re-triggers the one-shot drain when idle and the queue is non-empty
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
REAL liveness signal; the out-log only updates when a whole CASE finishes)
• backs off on rate-limit (claude_session 429) until the CLI's parsed reset;
a 429 only counts when the log is FRESH (an hours-old 429 is historical)
• verifies crash-safe per-chunk staging is committing (nothing lost)
TWO MODES (never self-stops):
• burst — while burst_until is set and in the future: window LIFTED
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
• night — otherwise: the drain's normal 23:0005:00 IDT window; the supervisor
stays on as the permanent nightly-drain health manager.
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
never bleeds into a fresh week's quota uninvited.
"""
import argparse
import json
import os
import re
import subprocess
from datetime import datetime, timedelta, timezone
from glob import glob
REPO = "/home/chaim/legal-ai"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain"
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
def _now_utc():
return datetime.now(timezone.utc)
def pm2_bin():
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
try:
subprocess.run([c, "-v"], capture_output=True, timeout=10)
return c
except Exception:
continue
return "pm2"
def claude_bin():
"""Resolve the claude CLI — PATH may be bare under pm2 cron, so fall back to
the known install location (same binary the drain's claude_session uses)."""
for c in ["claude", "/home/chaim/.local/bin/claude",
*glob("/home/chaim/.nvm/versions/node/*/bin/claude")]:
try:
if subprocess.run([c, "--version"], capture_output=True,
timeout=10).returncode == 0:
return c
except Exception:
continue
return "claude"
PM2 = pm2_bin()
CLAUDE = claude_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"}
def quota_available() -> bool:
"""Cheap live probe: is the claude.ai quota actually usable right now?
The 429 reset time claude.ai reports is often conservative — quota frees up
earlier. Rather than trust that timestamp and wait blindly, we re-probe with
a tiny `claude -p` call and resume the moment it succeeds. Conservative on
failure: any non-zero exit, timeout, or limit message → treat as still
limited (so a flaky probe never resumes the drain into a real 429)."""
try:
r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"],
capture_output=True, text=True, timeout=60, env=_ENV,
cwd=REPO)
except Exception:
return False
if r.returncode != 0:
return False
out = ((r.stdout or "") + (r.stderr or ""))
low = out.lower()
if "usage limit" in low or "session limit" in low or 'api_error_status":429' in out:
return False
return "OK" in out
# ── DB access (via the repo venv; the module self-configures) ────────────────
def _venv_py(code: str, timeout: int = 120) -> str:
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
cwd=REPO, timeout=timeout, env=_ENV)
if r.returncode != 0:
raise RuntimeError((r.stderr or r.stdout)[:300])
return r.stdout.strip()
def db_snapshot() -> dict:
"""Queue + staging counts + burst_until, in one round-trip."""
code = (
"import sys,os,asyncio,json\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" dis=await c.fetchval(\"SELECT disabled FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None,'disabled':bool(dis)}))\n"
"asyncio.run(m())\n"
)
return json.loads(_venv_py(code).splitlines()[-1])
_SET_BURST_SQL = (
"INSERT INTO drain_controls (name, burst_until, updated_at) "
"VALUES ('legal-halacha-drain', $1, now()) "
"ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()"
)
def db_set_burst(until_iso):
# Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag
# main, so the supervisor depends only on get_pool + the burst_until column —
# the column is the shared contract with the container-side /operations API.
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from datetime import datetime\n"
"from legal_mcp.services import db\n"
f"v={until_iso!r}\n"
f"SQL={_SET_BURST_SQL!r}\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n"
"asyncio.run(m())\n"
)
_venv_py(code, timeout=60)
def db_get_burst():
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(v.isoformat() if v else '')\n"
"asyncio.run(m())\n"
)
out = _venv_py(code, timeout=60).strip()
return datetime.fromisoformat(out) if out else None
def next_saturday_18_utc():
now_idt = _now_utc().astimezone(IDT)
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
if cand <= now_idt:
cand += timedelta(days=7)
return cand.astimezone(timezone.utc)
# ── state ────────────────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE) as f:
return json.load(f)
except Exception:
return {}
def save_state(s):
os.makedirs(RUNTIME_DIR, exist_ok=True)
tmp = STATE + ".tmp"
with open(tmp, "w") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE)
# ── pm2 / logs ────────────────────────────────────────────────────────────────
def pm2_status():
try:
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
for p in json.loads(r.stdout):
if p.get("name") == DRAIN:
return p.get("pm2_env", {}).get("status", "unknown")
except Exception as e:
return f"err:{e}"
return "absent"
def log_age_sec():
try:
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
except Exception:
return None
def tail(path, n=120):
try:
with open(path, errors="replace") as f:
return f.readlines()[-n:]
except Exception:
return []
def scan_rate_limit():
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
LAST drain outcome (after the last 'completed' line)."""
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
last_429, last_ok, reset_dt = -1, -1, None
for i, ln in enumerate(lines):
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
last_429 = i
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
if m:
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
if ap == "pm" and hh != 12:
hh += 12
if ap == "am" and hh == 12:
hh = 0
now = _now_utc()
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if cand <= now - timedelta(minutes=1):
cand += timedelta(days=1)
reset_dt = cand
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
last_ok = i
return (last_429 > last_ok and last_429 >= 0), reset_dt
# ── actions ───────────────────────────────────────────────────────────────────
def trigger_drain(win_start, win_end):
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
capture_output=True, text=True, env=env, timeout=60)
return r.returncode == 0, (r.stderr or r.stdout)[:200]
def stop_drain():
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
return r.returncode == 0
# ── the tick ──────────────────────────────────────────────────────────────────
def tick():
now = _now_utc()
prev = load_state()
notes = []
try:
snap = db_snapshot()
except Exception as e:
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
print("⚠️ כשל בקריאת ה-DB:", e)
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
return
# /operations "disabled" switch — highest priority. The drain self-guards at
# startup, but a process mid-run wouldn't notice the flag, and the cron keeps
# firing it; so when disabled we stop any running drain and never re-trigger,
# regardless of burst/window. (The UI toggle also stops it immediately via the
# bridge; this is the backstop + the "don't re-ignite a disabled drain" gate.)
if snap.get("disabled"):
stopped = stop_drain()
save_state({**prev, "tick_at": now.isoformat(), "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped"})
print(f"🛑 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: disabled | "
f"מסומן לא-פעיל ב-/operations — הדריינר {'נעצר' if stopped else 'כבר עצור'}.")
print("JSON:" + json.dumps(
{"ok": True, "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped",
"next_wake_sec": 900}, ensure_ascii=False))
return
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
if burst_until and now >= burst_until:
try:
db_set_burst(None)
except Exception as e:
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
stop_drain()
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
burst_until = None
burst = bool(burst_until and now < burst_until)
idt_hour = now.astimezone(IDT).hour
night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
window_open = burst or night_open
win = (0, 0) if burst else (NIGHT_START, NIGHT_END)
phase = "burst" if burst else "night"
sc = snap["status_counts"]
pending = int(sc.get("pending", 0))
processing = int(sc.get("processing", 0))
done = int(sc.get("completed", 0))
failed = int(sc.get("failed", 0))
hal = snap["halachot_total"]
ck = snap["checkpointed_chunks"]
proc_cases = snap["processing_cases"]
status = pm2_status()
age = log_age_sec()
rl_recent, reset_dt = scan_rate_limit()
d_hal = hal - prev.get("halachot_total", hal)
d_ck = ck - prev.get("checkpointed_chunks", ck)
d_done = done - prev.get("done", done)
# cooldown — fresh-gated; honor a stored future reset
fresh = (age is not None and age < 1800)
rl_active = bool(rl_recent and fresh)
cd_dt = None
if rl_active and reset_dt:
cd_dt = reset_dt
elif prev.get("cooldown_until"):
try:
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
except Exception:
cd_dt = None
in_cooldown = bool(cd_dt and now < cd_dt)
# Don't trust the reported reset time — re-probe. claude.ai usually frees up
# quota EARLIER than the 429 message claims, and the old code then sat idle
# until that (conservative) timestamp. When we'd otherwise hold, a tiny live
# probe lets us resume the instant quota is actually back (≤ one tick), no
# manual kick. Runs at most once per tick and only while we think we're
# limited, so the cost is negligible.
if in_cooldown and quota_available():
notes.append(
f"בדיקת-מכסה הצליחה — המכסה חזרה לפני האיפוס המדווח "
f"({cd_dt.astimezone(IDT):%H:%M IDT}); מתחדש מיד.")
cd_dt = None
in_cooldown = False
cooldown_until = cd_dt.isoformat() if cd_dt else None
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
# progress-based liveness (chunk checkpoints, NOT log mtime)
if ck > prev.get("checkpointed_chunks", ck):
last_progress_at = now
else:
lp = prev.get("last_progress_at")
try:
last_progress_at = datetime.fromisoformat(lp) if lp else now
except Exception:
last_progress_at = now
progress_age = (now - last_progress_at).total_seconds()
hung = (status == "online" and prev.get("last_progress_at") is not None
and progress_age > STUCK_SILENCE_SEC)
# ── decide ONE action ──
action, detail, mode = "none", "", "running"
if pending == 0 and processing == 0:
mode = "done"
notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited"
action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif hung:
mode = "hung"
ok, detail = trigger_drain(*win)
action = "restart-hung" if ok else "restart-hung-failed"
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
elif status == "online":
mode = "running"
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
elif not window_open:
mode = "idle_off_window"
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:000{NIGHT_END}:00) — ממתין; {pending} בתור.")
else:
ok, detail = trigger_drain(*win)
action = "triggered" if ok else "trigger-failed"
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
staging_ok = not (d_done > 0 and d_ck == 0)
if not staging_ok:
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
elif mode in ("done", "idle_off_window"):
next_wake = 1800
else:
next_wake = 900
save_state({
"tick_at": now.isoformat(), "phase": phase,
"burst_until": burst_until.isoformat() if burst_until else None,
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
"pending": pending, "processing": processing, "processing_cases": proc_cases,
"last_progress_at": last_progress_at.isoformat(),
"cooldown_until": cooldown_until if in_cooldown else None,
"mode": mode, "action": action,
})
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
if prev.get("tick_at") else "")
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
if proc_cases:
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
for n in notes:
print(f"{n}")
print("JSON:" + json.dumps({
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
"burst_until": burst_until.isoformat() if burst_until else None,
"pending": pending, "processing": processing, "done": done, "failed": failed,
"halachot_total": hal, "checkpointed_chunks": ck,
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
"pm2": status, "progress_age_sec": int(progress_age),
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
}, ensure_ascii=False))
def cmd_status():
bu = db_get_burst()
now = _now_utc()
burst = bool(bu and now < bu)
st = load_state()
print(f"מצב: {'BURST' if burst else 'night'}"
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:0005:00)"))
if st:
print(f"טיק אחרון: {st.get('tick_at','')} | mode={st.get('mode')} action={st.get('action')}")
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
print(f"דריינר pm2: {pm2_status()}")
def main():
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
ap.add_argument("cmd", nargs="?", default="tick",
choices=["tick", "burst-on", "burst-off", "status"])
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
a = ap.parse_args()
if a.cmd == "burst-on":
if a.until:
try:
until = datetime.fromisoformat(a.until)
except ValueError:
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
if until.tzinfo is None:
until = until.replace(tzinfo=IDT)
else:
until = next_saturday_18_utc()
db_set_burst(until.isoformat())
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
tick() # kick off immediately
elif a.cmd == "burst-off":
db_set_burst(None)
stopped = stop_drain()
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
f"חזרה למצב לילה (23:0005:00).")
tick()
elif a.cmd == "status":
cmd_status()
else:
tick()
if __name__ == "__main__":
main()