Files
legal-ai/scripts/halacha_drain_supervisor.py
Chaim a44827c3dd
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
fix(operations): disabling the halacha drain now stops a running process immediately
The /operations "disabled" toggle only wrote drain_controls.disabled, which the
drain checks at STARTUP — so a drain already mid-run kept going until the queue
emptied or the night window closed. Disabling did not stop a running drain.

Three layers, immediate + backstops:
- web/app.py operations_drain_toggle: on disable, also stop the running process
  immediately via the host pm2 bridge (_ops_pm2_control). Best-effort — a bridge
  failure doesn't fail the toggle.
- halacha_drain_supervisor.py: each tick now reads the disabled flag (added to
  db_snapshot) and, when set, stops the drain and never re-triggers it —
  regardless of burst/window. Backstop if the UI path failed (≤ one tick).
- drain_halacha_queue.py: re-check is_drain_disabled at the top of every round,
  so a drain disabled mid-run halts at the next round boundary. Per-chunk
  checkpoints mean the in-flight case loses nothing.

SCRIPTS.md updated for both drain and supervisor.

Invariants: G1 (fix at source — the disable control honoured along every path,
not just at startup); G2 (no parallel control path — same drain_controls flag).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 09:03:07 +00:00

455 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
Subcommands (argparse):
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
burst-on start a daytime BURST — drain runs continuously until the upcoming
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
burst-off stop the burst now (revert to night mode) and halt the running drain
status print current mode + queue + drain state (no action)
BURST is the chair's manual "run continuously now" window. Source of truth is the
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
the /operations page reads/writes (G1, single source; G2, no parallel control
path). burst-on/off here and the /operations toggle are equivalent front-ends.
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
log tails →
• re-triggers the one-shot drain when idle and the queue is non-empty
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
REAL liveness signal; the out-log only updates when a whole CASE finishes)
• backs off on rate-limit (claude_session 429) until the CLI's parsed reset;
a 429 only counts when the log is FRESH (an hours-old 429 is historical)
• verifies crash-safe per-chunk staging is committing (nothing lost)
TWO MODES (never self-stops):
• burst — while burst_until is set and in the future: window LIFTED
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
• night — otherwise: the drain's normal 23:0005:00 IDT window; the supervisor
stays on as the permanent nightly-drain health manager.
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
never bleeds into a fresh week's quota uninvited.
"""
import argparse
import json
import os
import re
import subprocess
from datetime import datetime, timedelta, timezone
from glob import glob
REPO = "/home/chaim/legal-ai"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain"
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
def _now_utc():
return datetime.now(timezone.utc)
def pm2_bin():
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
try:
subprocess.run([c, "-v"], capture_output=True, timeout=10)
return c
except Exception:
continue
return "pm2"
PM2 = pm2_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"}
# ── DB access (via the repo venv; the module self-configures) ────────────────
def _venv_py(code: str, timeout: int = 120) -> str:
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
cwd=REPO, timeout=timeout, env=_ENV)
if r.returncode != 0:
raise RuntimeError((r.stderr or r.stdout)[:300])
return r.stdout.strip()
def db_snapshot() -> dict:
"""Queue + staging counts + burst_until, in one round-trip."""
code = (
"import sys,os,asyncio,json\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" dis=await c.fetchval(\"SELECT disabled FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None,'disabled':bool(dis)}))\n"
"asyncio.run(m())\n"
)
return json.loads(_venv_py(code).splitlines()[-1])
_SET_BURST_SQL = (
"INSERT INTO drain_controls (name, burst_until, updated_at) "
"VALUES ('legal-halacha-drain', $1, now()) "
"ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()"
)
def db_set_burst(until_iso):
# Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag
# main, so the supervisor depends only on get_pool + the burst_until column —
# the column is the shared contract with the container-side /operations API.
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from datetime import datetime\n"
"from legal_mcp.services import db\n"
f"v={until_iso!r}\n"
f"SQL={_SET_BURST_SQL!r}\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n"
"asyncio.run(m())\n"
)
_venv_py(code, timeout=60)
def db_get_burst():
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(v.isoformat() if v else '')\n"
"asyncio.run(m())\n"
)
out = _venv_py(code, timeout=60).strip()
return datetime.fromisoformat(out) if out else None
def next_saturday_18_utc():
now_idt = _now_utc().astimezone(IDT)
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
if cand <= now_idt:
cand += timedelta(days=7)
return cand.astimezone(timezone.utc)
# ── state ────────────────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE) as f:
return json.load(f)
except Exception:
return {}
def save_state(s):
os.makedirs(RUNTIME_DIR, exist_ok=True)
tmp = STATE + ".tmp"
with open(tmp, "w") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE)
# ── pm2 / logs ────────────────────────────────────────────────────────────────
def pm2_status():
try:
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
for p in json.loads(r.stdout):
if p.get("name") == DRAIN:
return p.get("pm2_env", {}).get("status", "unknown")
except Exception as e:
return f"err:{e}"
return "absent"
def log_age_sec():
try:
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
except Exception:
return None
def tail(path, n=120):
try:
with open(path, errors="replace") as f:
return f.readlines()[-n:]
except Exception:
return []
def scan_rate_limit():
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
LAST drain outcome (after the last 'completed' line)."""
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
last_429, last_ok, reset_dt = -1, -1, None
for i, ln in enumerate(lines):
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
last_429 = i
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
if m:
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
if ap == "pm" and hh != 12:
hh += 12
if ap == "am" and hh == 12:
hh = 0
now = _now_utc()
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if cand <= now - timedelta(minutes=1):
cand += timedelta(days=1)
reset_dt = cand
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
last_ok = i
return (last_429 > last_ok and last_429 >= 0), reset_dt
# ── actions ───────────────────────────────────────────────────────────────────
def trigger_drain(win_start, win_end):
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
capture_output=True, text=True, env=env, timeout=60)
return r.returncode == 0, (r.stderr or r.stdout)[:200]
def stop_drain():
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
return r.returncode == 0
# ── the tick ──────────────────────────────────────────────────────────────────
def tick():
now = _now_utc()
prev = load_state()
notes = []
try:
snap = db_snapshot()
except Exception as e:
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
print("⚠️ כשל בקריאת ה-DB:", e)
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
return
# /operations "disabled" switch — highest priority. The drain self-guards at
# startup, but a process mid-run wouldn't notice the flag, and the cron keeps
# firing it; so when disabled we stop any running drain and never re-trigger,
# regardless of burst/window. (The UI toggle also stops it immediately via the
# bridge; this is the backstop + the "don't re-ignite a disabled drain" gate.)
if snap.get("disabled"):
stopped = stop_drain()
save_state({**prev, "tick_at": now.isoformat(), "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped"})
print(f"🛑 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: disabled | "
f"מסומן לא-פעיל ב-/operations — הדריינר {'נעצר' if stopped else 'כבר עצור'}.")
print("JSON:" + json.dumps(
{"ok": True, "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped",
"next_wake_sec": 900}, ensure_ascii=False))
return
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
if burst_until and now >= burst_until:
try:
db_set_burst(None)
except Exception as e:
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
stop_drain()
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
burst_until = None
burst = bool(burst_until and now < burst_until)
idt_hour = now.astimezone(IDT).hour
night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
window_open = burst or night_open
win = (0, 0) if burst else (NIGHT_START, NIGHT_END)
phase = "burst" if burst else "night"
sc = snap["status_counts"]
pending = int(sc.get("pending", 0))
processing = int(sc.get("processing", 0))
done = int(sc.get("completed", 0))
failed = int(sc.get("failed", 0))
hal = snap["halachot_total"]
ck = snap["checkpointed_chunks"]
proc_cases = snap["processing_cases"]
status = pm2_status()
age = log_age_sec()
rl_recent, reset_dt = scan_rate_limit()
d_hal = hal - prev.get("halachot_total", hal)
d_ck = ck - prev.get("checkpointed_chunks", ck)
d_done = done - prev.get("done", done)
# cooldown — fresh-gated; honor a stored future reset
fresh = (age is not None and age < 1800)
rl_active = bool(rl_recent and fresh)
cd_dt = None
if rl_active and reset_dt:
cd_dt = reset_dt
elif prev.get("cooldown_until"):
try:
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
except Exception:
cd_dt = None
cooldown_until = cd_dt.isoformat() if cd_dt else None
in_cooldown = bool(cd_dt and now < cd_dt)
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
# progress-based liveness (chunk checkpoints, NOT log mtime)
if ck > prev.get("checkpointed_chunks", ck):
last_progress_at = now
else:
lp = prev.get("last_progress_at")
try:
last_progress_at = datetime.fromisoformat(lp) if lp else now
except Exception:
last_progress_at = now
progress_age = (now - last_progress_at).total_seconds()
hung = (status == "online" and prev.get("last_progress_at") is not None
and progress_age > STUCK_SILENCE_SEC)
# ── decide ONE action ──
action, detail, mode = "none", "", "running"
if pending == 0 and processing == 0:
mode = "done"
notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited"
action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif hung:
mode = "hung"
ok, detail = trigger_drain(*win)
action = "restart-hung" if ok else "restart-hung-failed"
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
elif status == "online":
mode = "running"
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
elif not window_open:
mode = "idle_off_window"
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:000{NIGHT_END}:00) — ממתין; {pending} בתור.")
else:
ok, detail = trigger_drain(*win)
action = "triggered" if ok else "trigger-failed"
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
staging_ok = not (d_done > 0 and d_ck == 0)
if not staging_ok:
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
elif mode in ("done", "idle_off_window"):
next_wake = 1800
else:
next_wake = 900
save_state({
"tick_at": now.isoformat(), "phase": phase,
"burst_until": burst_until.isoformat() if burst_until else None,
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
"pending": pending, "processing": processing, "processing_cases": proc_cases,
"last_progress_at": last_progress_at.isoformat(),
"cooldown_until": cooldown_until if in_cooldown else None,
"mode": mode, "action": action,
})
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
if prev.get("tick_at") else "")
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
if proc_cases:
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
for n in notes:
print(f"{n}")
print("JSON:" + json.dumps({
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
"burst_until": burst_until.isoformat() if burst_until else None,
"pending": pending, "processing": processing, "done": done, "failed": failed,
"halachot_total": hal, "checkpointed_chunks": ck,
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
"pm2": status, "progress_age_sec": int(progress_age),
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
}, ensure_ascii=False))
def cmd_status():
bu = db_get_burst()
now = _now_utc()
burst = bool(bu and now < bu)
st = load_state()
print(f"מצב: {'BURST' if burst else 'night'}"
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:0005:00)"))
if st:
print(f"טיק אחרון: {st.get('tick_at','')} | mode={st.get('mode')} action={st.get('action')}")
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
print(f"דריינר pm2: {pm2_status()}")
def main():
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
ap.add_argument("cmd", nargs="?", default="tick",
choices=["tick", "burst-on", "burst-off", "status"])
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
a = ap.parse_args()
if a.cmd == "burst-on":
if a.until:
try:
until = datetime.fromisoformat(a.until)
except ValueError:
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
if until.tzinfo is None:
until = until.replace(tzinfo=IDT)
else:
until = next_saturday_18_utc()
db_set_burst(until.isoformat())
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
tick() # kick off immediately
elif a.cmd == "burst-off":
db_set_burst(None)
stopped = stop_drain()
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
f"חזרה למצב לילה (23:0005:00).")
tick()
elif a.cmd == "status":
cmd_status()
else:
tick()
if __name__ == "__main__":
main()