Files
legal-ai/scripts/halacha_drain_supervisor.py
Chaim 75a1b23972
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 9s
fix(supervisor): burst set/get via raw SQL, not new db helpers (host-lag-proof)
The host pm2 supervisor imports legal_mcp.services.db from the host repo checkout,
which can lag main by many commits. Depending on the just-added db.set_drain_burst/
get_drain_burst would require the host checkout to be current. Use raw SQL via the
stable db.get_pool() instead — the supervisor now depends only on get_pool + the
drain_controls.burst_until column (the shared contract with the /operations API).
The container-side API keeps using the typed helpers (it ships the code in-image).

Invariants: G1/G2 unchanged (same single DB column, no parallel path).
2026-06-12 11:16:38 +00:00

437 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
Subcommands (argparse):
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
burst-on start a daytime BURST — drain runs continuously until the upcoming
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
burst-off stop the burst now (revert to night mode) and halt the running drain
status print current mode + queue + drain state (no action)
BURST is the chair's manual "run continuously now" window. Source of truth is the
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
the /operations page reads/writes (G1, single source; G2, no parallel control
path). burst-on/off here and the /operations toggle are equivalent front-ends.
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
log tails →
• re-triggers the one-shot drain when idle and the queue is non-empty
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
REAL liveness signal; the out-log only updates when a whole CASE finishes)
• backs off on rate-limit (claude_session 429) until the CLI's parsed reset;
a 429 only counts when the log is FRESH (an hours-old 429 is historical)
• verifies crash-safe per-chunk staging is committing (nothing lost)
TWO MODES (never self-stops):
• burst — while burst_until is set and in the future: window LIFTED
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
• night — otherwise: the drain's normal 23:0005:00 IDT window; the supervisor
stays on as the permanent nightly-drain health manager.
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
never bleeds into a fresh week's quota uninvited.
"""
import argparse
import json
import os
import re
import subprocess
from datetime import datetime, timedelta, timezone
from glob import glob
REPO = "/home/chaim/legal-ai"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain"
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
def _now_utc():
return datetime.now(timezone.utc)
def pm2_bin():
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
try:
subprocess.run([c, "-v"], capture_output=True, timeout=10)
return c
except Exception:
continue
return "pm2"
PM2 = pm2_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"}
# ── DB access (via the repo venv; the module self-configures) ────────────────
def _venv_py(code: str, timeout: int = 120) -> str:
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
cwd=REPO, timeout=timeout, env=_ENV)
if r.returncode != 0:
raise RuntimeError((r.stderr or r.stdout)[:300])
return r.stdout.strip()
def db_snapshot() -> dict:
"""Queue + staging counts + burst_until, in one round-trip."""
code = (
"import sys,os,asyncio,json\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None}))\n"
"asyncio.run(m())\n"
)
return json.loads(_venv_py(code).splitlines()[-1])
_SET_BURST_SQL = (
"INSERT INTO drain_controls (name, burst_until, updated_at) "
"VALUES ('legal-halacha-drain', $1, now()) "
"ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()"
)
def db_set_burst(until_iso):
# Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag
# main, so the supervisor depends only on get_pool + the burst_until column —
# the column is the shared contract with the container-side /operations API.
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from datetime import datetime\n"
"from legal_mcp.services import db\n"
f"v={until_iso!r}\n"
f"SQL={_SET_BURST_SQL!r}\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n"
"asyncio.run(m())\n"
)
_venv_py(code, timeout=60)
def db_get_burst():
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(v.isoformat() if v else '')\n"
"asyncio.run(m())\n"
)
out = _venv_py(code, timeout=60).strip()
return datetime.fromisoformat(out) if out else None
def next_saturday_18_utc():
now_idt = _now_utc().astimezone(IDT)
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
if cand <= now_idt:
cand += timedelta(days=7)
return cand.astimezone(timezone.utc)
# ── state ────────────────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE) as f:
return json.load(f)
except Exception:
return {}
def save_state(s):
os.makedirs(RUNTIME_DIR, exist_ok=True)
tmp = STATE + ".tmp"
with open(tmp, "w") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE)
# ── pm2 / logs ────────────────────────────────────────────────────────────────
def pm2_status():
try:
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
for p in json.loads(r.stdout):
if p.get("name") == DRAIN:
return p.get("pm2_env", {}).get("status", "unknown")
except Exception as e:
return f"err:{e}"
return "absent"
def log_age_sec():
try:
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
except Exception:
return None
def tail(path, n=120):
try:
with open(path, errors="replace") as f:
return f.readlines()[-n:]
except Exception:
return []
def scan_rate_limit():
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
LAST drain outcome (after the last 'completed' line)."""
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
last_429, last_ok, reset_dt = -1, -1, None
for i, ln in enumerate(lines):
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
last_429 = i
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
if m:
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
if ap == "pm" and hh != 12:
hh += 12
if ap == "am" and hh == 12:
hh = 0
now = _now_utc()
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if cand <= now - timedelta(minutes=1):
cand += timedelta(days=1)
reset_dt = cand
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
last_ok = i
return (last_429 > last_ok and last_429 >= 0), reset_dt
# ── actions ───────────────────────────────────────────────────────────────────
def trigger_drain(win_start, win_end):
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
capture_output=True, text=True, env=env, timeout=60)
return r.returncode == 0, (r.stderr or r.stdout)[:200]
def stop_drain():
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
return r.returncode == 0
# ── the tick ──────────────────────────────────────────────────────────────────
def tick():
now = _now_utc()
prev = load_state()
notes = []
try:
snap = db_snapshot()
except Exception as e:
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
print("⚠️ כשל בקריאת ה-DB:", e)
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
return
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
if burst_until and now >= burst_until:
try:
db_set_burst(None)
except Exception as e:
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
stop_drain()
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
burst_until = None
burst = bool(burst_until and now < burst_until)
idt_hour = now.astimezone(IDT).hour
night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
window_open = burst or night_open
win = (0, 0) if burst else (NIGHT_START, NIGHT_END)
phase = "burst" if burst else "night"
sc = snap["status_counts"]
pending = int(sc.get("pending", 0))
processing = int(sc.get("processing", 0))
done = int(sc.get("completed", 0))
failed = int(sc.get("failed", 0))
hal = snap["halachot_total"]
ck = snap["checkpointed_chunks"]
proc_cases = snap["processing_cases"]
status = pm2_status()
age = log_age_sec()
rl_recent, reset_dt = scan_rate_limit()
d_hal = hal - prev.get("halachot_total", hal)
d_ck = ck - prev.get("checkpointed_chunks", ck)
d_done = done - prev.get("done", done)
# cooldown — fresh-gated; honor a stored future reset
fresh = (age is not None and age < 1800)
rl_active = bool(rl_recent and fresh)
cd_dt = None
if rl_active and reset_dt:
cd_dt = reset_dt
elif prev.get("cooldown_until"):
try:
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
except Exception:
cd_dt = None
cooldown_until = cd_dt.isoformat() if cd_dt else None
in_cooldown = bool(cd_dt and now < cd_dt)
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
# progress-based liveness (chunk checkpoints, NOT log mtime)
if ck > prev.get("checkpointed_chunks", ck):
last_progress_at = now
else:
lp = prev.get("last_progress_at")
try:
last_progress_at = datetime.fromisoformat(lp) if lp else now
except Exception:
last_progress_at = now
progress_age = (now - last_progress_at).total_seconds()
hung = (status == "online" and prev.get("last_progress_at") is not None
and progress_age > STUCK_SILENCE_SEC)
# ── decide ONE action ──
action, detail, mode = "none", "", "running"
if pending == 0 and processing == 0:
mode = "done"
notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited"
action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif hung:
mode = "hung"
ok, detail = trigger_drain(*win)
action = "restart-hung" if ok else "restart-hung-failed"
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
elif status == "online":
mode = "running"
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
elif not window_open:
mode = "idle_off_window"
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:000{NIGHT_END}:00) — ממתין; {pending} בתור.")
else:
ok, detail = trigger_drain(*win)
action = "triggered" if ok else "trigger-failed"
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
staging_ok = not (d_done > 0 and d_ck == 0)
if not staging_ok:
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
elif mode in ("done", "idle_off_window"):
next_wake = 1800
else:
next_wake = 900
save_state({
"tick_at": now.isoformat(), "phase": phase,
"burst_until": burst_until.isoformat() if burst_until else None,
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
"pending": pending, "processing": processing, "processing_cases": proc_cases,
"last_progress_at": last_progress_at.isoformat(),
"cooldown_until": cooldown_until if in_cooldown else None,
"mode": mode, "action": action,
})
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
if prev.get("tick_at") else "")
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
if proc_cases:
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
for n in notes:
print(f"{n}")
print("JSON:" + json.dumps({
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
"burst_until": burst_until.isoformat() if burst_until else None,
"pending": pending, "processing": processing, "done": done, "failed": failed,
"halachot_total": hal, "checkpointed_chunks": ck,
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
"pm2": status, "progress_age_sec": int(progress_age),
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
}, ensure_ascii=False))
def cmd_status():
bu = db_get_burst()
now = _now_utc()
burst = bool(bu and now < bu)
st = load_state()
print(f"מצב: {'BURST' if burst else 'night'}"
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:0005:00)"))
if st:
print(f"טיק אחרון: {st.get('tick_at','')} | mode={st.get('mode')} action={st.get('action')}")
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
print(f"דריינר pm2: {pm2_status()}")
def main():
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
ap.add_argument("cmd", nargs="?", default="tick",
choices=["tick", "burst-on", "burst-off", "status"])
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
a = ap.parse_args()
if a.cmd == "burst-on":
if a.until:
try:
until = datetime.fromisoformat(a.until)
except ValueError:
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
if until.tzinfo is None:
until = until.replace(tzinfo=IDT)
else:
until = next_saturday_18_utc()
db_set_burst(until.isoformat())
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
tick() # kick off immediately
elif a.cmd == "burst-off":
db_set_burst(None)
stopped = stop_drain()
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
f"חזרה למצב לילה (23:0005:00).")
tick()
elif a.cmd == "status":
cmd_status()
else:
tick()
if __name__ == "__main__":
main()