#!/usr/bin/env python3 """Halacha-drain supervisor — permanent health manager for legal-halacha-drain. Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just reads DB/logs/pm2 and pokes the drain via the established run-now mechanism. Subcommands (argparse): tick (default — what the pm2 cron fires every 15 min) run ONE health tick burst-on start a daytime BURST — drain runs continuously until the upcoming Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately. burst-off stop the burst now (revert to night mode) and halt the running drain status print current mode + queue + drain state (no action) BURST is the chair's manual "run continuously now" window. Source of truth is the DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value the /operations page reads/writes (G1, single source; G2, no parallel control path). burst-on/off here and the /operations toggle are equivalent front-ends. One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status, log tails → • re-triggers the one-shot drain when idle and the queue is non-empty • restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the REAL liveness signal; the out-log only updates when a whole CASE finishes) • backs off on rate-limit (claude_session 429) until the CLI's parsed reset; a 429 only counts when the log is FRESH (an hours-old 429 is historical) • verifies crash-safe per-chunk staging is committing (nothing lost) TWO MODES (never self-stops): • burst — while burst_until is set and in the future: window LIFTED (HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day. • night — otherwise: the drain's normal 23:00–05:00 IDT window; the supervisor stays on as the permanent nightly-drain health manager. Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it never bleeds into a fresh week's quota uninvited. """ import argparse import json import os import re import subprocess from datetime import datetime, timedelta, timezone from glob import glob REPO = "/home/chaim/legal-ai" RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo) STATE = os.path.join(RUNTIME_DIR, "state.json") DRAIN = "legal-halacha-drain" OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log" ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log" VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python") STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours) def _now_utc(): return datetime.now(timezone.utc) def pm2_bin(): for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]: try: subprocess.run([c, "-v"], capture_output=True, timeout=10) return c except Exception: continue return "pm2" PM2 = pm2_bin() _ENV = {**os.environ, "HOME": "/home/chaim"} # ── DB access (via the repo venv; the module self-configures) ──────────────── def _venv_py(code: str, timeout: int = 120) -> str: r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True, cwd=REPO, timeout=timeout, env=_ENV) if r.returncode != 0: raise RuntimeError((r.stderr or r.stdout)[:300]) return r.stdout.strip() def db_snapshot() -> dict: """Queue + staging counts + burst_until, in one round-trip.""" code = ( "import sys,os,asyncio,json\n" "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" "from legal_mcp.services import db\n" "async def m():\n" " pool=await db.get_pool()\n" " async with pool.acquire() as c:\n" " st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n" " procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n" " hal=await c.fetchval('SELECT count(*) FROM halachot')\n" " ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n" " pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n" " bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n" " print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None}))\n" "asyncio.run(m())\n" ) return json.loads(_venv_py(code).splitlines()[-1]) _SET_BURST_SQL = ( "INSERT INTO drain_controls (name, burst_until, updated_at) " "VALUES ('legal-halacha-drain', $1, now()) " "ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()" ) def db_set_burst(until_iso): # Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag # main, so the supervisor depends only on get_pool + the burst_until column — # the column is the shared contract with the container-side /operations API. code = ( "import sys,os,asyncio\n" "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" "from datetime import datetime\n" "from legal_mcp.services import db\n" f"v={until_iso!r}\n" f"SQL={_SET_BURST_SQL!r}\n" "async def m():\n" " pool=await db.get_pool()\n" " async with pool.acquire() as c:\n" " await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n" "asyncio.run(m())\n" ) _venv_py(code, timeout=60) def db_get_burst(): code = ( "import sys,os,asyncio\n" "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" "from legal_mcp.services import db\n" "async def m():\n" " pool=await db.get_pool()\n" " async with pool.acquire() as c:\n" " v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n" " print(v.isoformat() if v else '')\n" "asyncio.run(m())\n" ) out = _venv_py(code, timeout=60).strip() return datetime.fromisoformat(out) if out else None def next_saturday_18_utc(): now_idt = _now_utc().astimezone(IDT) days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6 cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days) if cand <= now_idt: cand += timedelta(days=7) return cand.astimezone(timezone.utc) # ── state ──────────────────────────────────────────────────────────────────── def load_state(): try: with open(STATE) as f: return json.load(f) except Exception: return {} def save_state(s): os.makedirs(RUNTIME_DIR, exist_ok=True) tmp = STATE + ".tmp" with open(tmp, "w") as f: json.dump(s, f, ensure_ascii=False, indent=2) os.replace(tmp, STATE) # ── pm2 / logs ──────────────────────────────────────────────────────────────── def pm2_status(): try: r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30) for p in json.loads(r.stdout): if p.get("name") == DRAIN: return p.get("pm2_env", {}).get("status", "unknown") except Exception as e: return f"err:{e}" return "absent" def log_age_sec(): try: return _now_utc().timestamp() - os.path.getmtime(OUT_LOG) except Exception: return None def tail(path, n=120): try: with open(path, errors="replace") as f: return f.readlines()[-n:] except Exception: return [] def scan_rate_limit(): """(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the LAST drain outcome (after the last 'completed' line).""" lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120) last_429, last_ok, reset_dt = -1, -1, None for i, ln in enumerate(lines): if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower(): last_429 = i m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I) if m: hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower() if ap == "pm" and hh != 12: hh += 12 if ap == "am" and hh == 12: hh = 0 now = _now_utc() cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0) if cand <= now - timedelta(minutes=1): cand += timedelta(days=1) reset_dt = cand if "completed stored=" in ln or ("[round" in ln and "processed=" in ln): last_ok = i return (last_429 > last_ok and last_429 >= 0), reset_dt # ── actions ─────────────────────────────────────────────────────────────────── def trigger_drain(win_start, win_end): """Restart the one-shot drain, pushing its window env (idempotent, reboot-safe).""" env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start), "HALACHA_DRAIN_WINDOW_END": str(win_end)} r = subprocess.run([PM2, "restart", DRAIN, "--update-env"], capture_output=True, text=True, env=env, timeout=60) return r.returncode == 0, (r.stderr or r.stdout)[:200] def stop_drain(): r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60) return r.returncode == 0 # ── the tick ────────────────────────────────────────────────────────────────── def tick(): now = _now_utc() prev = load_state() notes = [] try: snap = db_snapshot() except Exception as e: save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"}) print("⚠️ כשל בקריאת ה-DB:", e) print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none", "mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False)) return # burst state from the DB + auto-expiry (manual on; auto-off at the deadline) burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None if burst_until and now >= burst_until: try: db_set_burst(None) except Exception as e: notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})") stop_drain() notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.") burst_until = None burst = bool(burst_until and now < burst_until) idt_hour = now.astimezone(IDT).hour night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END) window_open = burst or night_open win = (0, 0) if burst else (NIGHT_START, NIGHT_END) phase = "burst" if burst else "night" sc = snap["status_counts"] pending = int(sc.get("pending", 0)) processing = int(sc.get("processing", 0)) done = int(sc.get("completed", 0)) failed = int(sc.get("failed", 0)) hal = snap["halachot_total"] ck = snap["checkpointed_chunks"] proc_cases = snap["processing_cases"] status = pm2_status() age = log_age_sec() rl_recent, reset_dt = scan_rate_limit() d_hal = hal - prev.get("halachot_total", hal) d_ck = ck - prev.get("checkpointed_chunks", ck) d_done = done - prev.get("done", done) # cooldown — fresh-gated; honor a stored future reset fresh = (age is not None and age < 1800) rl_active = bool(rl_recent and fresh) cd_dt = None if rl_active and reset_dt: cd_dt = reset_dt elif prev.get("cooldown_until"): try: cd_dt = datetime.fromisoformat(prev["cooldown_until"]) except Exception: cd_dt = None cooldown_until = cd_dt.isoformat() if cd_dt else None in_cooldown = bool(cd_dt and now < cd_dt) weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS)) # progress-based liveness (chunk checkpoints, NOT log mtime) if ck > prev.get("checkpointed_chunks", ck): last_progress_at = now else: lp = prev.get("last_progress_at") try: last_progress_at = datetime.fromisoformat(lp) if lp else now except Exception: last_progress_at = now progress_age = (now - last_progress_at).total_seconds() hung = (status == "online" and prev.get("last_progress_at") is not None and progress_age > STUCK_SILENCE_SEC) # ── decide ONE action ── action, detail, mode = "none", "", "running" if pending == 0 and processing == 0: mode = "done" notes.append("התור ריק — אין מה לחלץ.") elif in_cooldown: mode = "weekly_exhausted" if weekly else "ratelimited" action = "hold" notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") elif hung: mode = "hung" ok, detail = trigger_drain(*win) action = "restart-hung" if ok else "restart-hung-failed" notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.") elif status == "online": mode = "running" notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').") elif not window_open: mode = "idle_off_window" notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:00–0{NIGHT_END}:00) — ממתין; {pending} בתור.") else: ok, detail = trigger_drain(*win) action = "triggered" if ok else "trigger-failed" notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}") staging_ok = not (d_done > 0 and d_ck == 0) if not staging_ok: notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.") if mode in ("ratelimited", "weekly_exhausted") and cd_dt: next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600)) elif mode in ("done", "idle_off_window"): next_wake = 1800 else: next_wake = 900 save_state({ "tick_at": now.isoformat(), "phase": phase, "burst_until": burst_until.isoformat() if burst_until else None, "halachot_total": hal, "checkpointed_chunks": ck, "done": done, "pending": pending, "processing": processing, "processing_cases": proc_cases, "last_progress_at": last_progress_at.isoformat(), "cooldown_until": cooldown_until if in_cooldown else None, "mode": mode, "action": action, }) elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)" if prev.get("tick_at") else "") bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else "" print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}") print(f" תור: pending={pending} processing={processing} done={done} failed={failed}") print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}") print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}") if proc_cases: print(f" בעיבוד עכשיו: {', '.join(proc_cases)}") for n in notes: print(f" • {n}") print("JSON:" + json.dumps({ "ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake, "burst_until": burst_until.isoformat() if burst_until else None, "pending": pending, "processing": processing, "done": done, "failed": failed, "halachot_total": hal, "checkpointed_chunks": ck, "delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done, "pm2": status, "progress_age_sec": int(progress_age), "staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until, }, ensure_ascii=False)) def cmd_status(): bu = db_get_burst() now = _now_utc() burst = bool(bu and now < bu) st = load_state() print(f"מצב: {'BURST' if burst else 'night'}" + (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:00–05:00)")) if st: print(f"טיק אחרון: {st.get('tick_at','—')} | mode={st.get('mode')} action={st.get('action')}") print(f"תור: pending={st.get('pending')} processing={st.get('processing')} " f"done={st.get('done')} | staging halachot={st.get('halachot_total')}") print(f"דריינר pm2: {pm2_status()}") def main(): ap = argparse.ArgumentParser(description="halacha-drain supervisor") ap.add_argument("cmd", nargs="?", default="tick", choices=["tick", "burst-on", "burst-off", "status"]) ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)") a = ap.parse_args() if a.cmd == "burst-on": if a.until: try: until = datetime.fromisoformat(a.until) except ValueError: until = datetime.strptime(a.until, "%Y-%m-%d %H:%M") if until.tzinfo is None: until = until.replace(tzinfo=IDT) else: until = next_saturday_18_utc() db_set_burst(until.isoformat()) print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).") tick() # kick off immediately elif a.cmd == "burst-off": db_set_burst(None) stopped = stop_drain() print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; " f"חזרה למצב לילה (23:00–05:00).") tick() elif a.cmd == "status": cmd_status() else: tick() if __name__ == "__main__": main()