Files
legal-ai/scripts/halacha_drain_supervisor.py
Chaim eac4dd3ac9
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 8s
fix(supervisor): gate + display weekly-Sonnet, not weekly-Opus
On this claude.ai account the populated per-model weekly cap is Sonnet;
seven_day_opus is null (no separate Opus cap). So quota_available() now gates on
five_hour + seven_day + seven_day_sonnet (was seven_day_opus, which never bound),
and `status` prints weekly-Sonnet. The all-models seven_day cap remains the
backstop for Opus usage regardless. Matches the /operations display (#245).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 10:58:05 +00:00

574 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
Subcommands (argparse):
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
burst-on start a daytime BURST — drain runs continuously until the upcoming
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
burst-off stop the burst now (revert to night mode) and halt the running drain
status print current mode + queue + drain state (no action)
BURST is the chair's manual "run continuously now" window. Source of truth is the
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
the /operations page reads/writes (G1, single source; G2, no parallel control
path). burst-on/off here and the /operations toggle are equivalent front-ends.
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
log tails →
• re-triggers the one-shot drain when idle and the queue is non-empty
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
REAL liveness signal; the out-log only updates when a whole CASE finishes)
• backs off on rate-limit (claude_session 429) until the CLI's parsed reset;
a 429 only counts when the log is FRESH (an hours-old 429 is historical)
• verifies crash-safe per-chunk staging is committing (nothing lost)
TWO MODES (never self-stops):
• burst — while burst_until is set and in the future: window LIFTED
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
• night — otherwise: the drain's normal 23:0005:00 IDT window; the supervisor
stays on as the permanent nightly-drain health manager.
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
never bleeds into a fresh week's quota uninvited.
"""
import argparse
import json
import os
import re
import subprocess
import urllib.request
from datetime import datetime, timedelta, timezone
from glob import glob
REPO = "/home/chaim/legal-ai"
# claude.ai subscription usage — the same 5-hour / 7-day utilization the Claude
# Code status bar shows, via the (undocumented) OAuth usage endpoint. The token
# lives in the CLI's own credentials file; the claude-code User-Agent is
# REQUIRED — without it the request lands in an aggressively rate-limited bucket
# and 429s. Unofficial endpoint: may change, so callers must tolerate None.
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
_USAGE_UA = "claude-code/2.1.177"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain"
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
def _now_utc():
return datetime.now(timezone.utc)
def pm2_bin():
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
try:
subprocess.run([c, "-v"], capture_output=True, timeout=10)
return c
except Exception:
continue
return "pm2"
def claude_bin():
"""Resolve the claude CLI — PATH may be bare under pm2 cron, so fall back to
the known install location (same binary the drain's claude_session uses)."""
for c in ["claude", "/home/chaim/.local/bin/claude",
*glob("/home/chaim/.nvm/versions/node/*/bin/claude")]:
try:
if subprocess.run([c, "--version"], capture_output=True,
timeout=10).returncode == 0:
return c
except Exception:
continue
return "claude"
PM2 = pm2_bin()
CLAUDE = claude_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"}
def subscription_usage() -> dict | None:
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
utilization the Claude Code UI shows — from the OAuth usage endpoint.
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
or None on ANY failure. Undocumented endpoint — every caller must tolerate
None and fall back."""
try:
with open(CLAUDE_CRED_PATH) as f:
token = json.load(f)["claudeAiOauth"]["accessToken"]
except Exception:
return None
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
"Authorization": f"Bearer {token}",
"User-Agent": _USAGE_UA, # required — else aggressive 429
"anthropic-beta": "oauth-2025-04-20",
})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
def quota_available() -> bool:
"""Is the claude.ai quota actually usable right now?
Primary: read the authoritative utilization from the OAuth usage endpoint
(subscription_usage) and treat a window as exhausted only at >=100%. Cheaper
and more precise than a probe — no Opus call, and it sees every limit
(5-hour, weekly all-models, weekly-Sonnet) the way the UI does. The 429 reset
time claude.ai reports is often conservative, so this resumes the drain the
moment a window actually frees up rather than waiting blindly.
Fallback (endpoint unreachable — it is undocumented): a tiny `claude -p`
probe via the official CLI. Conservative on failure: any non-zero exit,
timeout, or limit message → treat as still limited."""
usage = subscription_usage()
if usage is not None:
# A drain run needs the 5-hour window, the weekly all-models cap, AND
# the weekly per-model cap all below 100%. On this account the per-model
# cap that's actually populated is Sonnet (seven_day_opus is null — no
# separate Opus cap); the all-models seven_day cap is the backstop for
# Opus usage either way. null utilization → treated as 0% (not limiting).
windows = ("five_hour", "seven_day", "seven_day_sonnet")
utils = [(usage.get(w) or {}).get("utilization") for w in windows]
# utilization may be None (window inactive / no data) → treat as 0%.
return all((u or 0) < 100 for u in utils)
# ── fallback: official-CLI probe ──
try:
r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"],
capture_output=True, text=True, timeout=60, env=_ENV,
cwd=REPO)
except Exception:
return False
if r.returncode != 0:
return False
out = ((r.stdout or "") + (r.stderr or ""))
low = out.lower()
if "usage limit" in low or "session limit" in low or 'api_error_status":429' in out:
return False
return "OK" in out
# ── DB access (via the repo venv; the module self-configures) ────────────────
def _venv_py(code: str, timeout: int = 120) -> str:
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
cwd=REPO, timeout=timeout, env=_ENV)
if r.returncode != 0:
raise RuntimeError((r.stderr or r.stdout)[:300])
return r.stdout.strip()
def db_snapshot() -> dict:
"""Queue + staging counts + burst_until, in one round-trip."""
code = (
"import sys,os,asyncio,json\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" dis=await c.fetchval(\"SELECT disabled FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None,'disabled':bool(dis)}))\n"
"asyncio.run(m())\n"
)
return json.loads(_venv_py(code).splitlines()[-1])
_SET_BURST_SQL = (
"INSERT INTO drain_controls (name, burst_until, updated_at) "
"VALUES ('legal-halacha-drain', $1, now()) "
"ON CONFLICT (name) DO UPDATE SET burst_until = $1, updated_at = now()"
)
def db_set_burst(until_iso):
# Raw SQL via get_pool (NOT db.set_drain_burst): the host repo checkout can lag
# main, so the supervisor depends only on get_pool + the burst_until column —
# the column is the shared contract with the container-side /operations API.
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from datetime import datetime\n"
"from legal_mcp.services import db\n"
f"v={until_iso!r}\n"
f"SQL={_SET_BURST_SQL!r}\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" await c.execute(SQL, datetime.fromisoformat(v) if v else None)\n"
"asyncio.run(m())\n"
)
_venv_py(code, timeout=60)
def db_get_burst():
code = (
"import sys,os,asyncio\n"
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
"from legal_mcp.services import db\n"
"async def m():\n"
" pool=await db.get_pool()\n"
" async with pool.acquire() as c:\n"
" v=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
" print(v.isoformat() if v else '')\n"
"asyncio.run(m())\n"
)
out = _venv_py(code, timeout=60).strip()
return datetime.fromisoformat(out) if out else None
def next_saturday_18_utc():
now_idt = _now_utc().astimezone(IDT)
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
if cand <= now_idt:
cand += timedelta(days=7)
return cand.astimezone(timezone.utc)
# ── state ────────────────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE) as f:
return json.load(f)
except Exception:
return {}
def save_state(s):
os.makedirs(RUNTIME_DIR, exist_ok=True)
tmp = STATE + ".tmp"
with open(tmp, "w") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE)
# ── pm2 / logs ────────────────────────────────────────────────────────────────
def pm2_status():
try:
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
for p in json.loads(r.stdout):
if p.get("name") == DRAIN:
return p.get("pm2_env", {}).get("status", "unknown")
except Exception as e:
return f"err:{e}"
return "absent"
def log_age_sec():
try:
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
except Exception:
return None
def tail(path, n=120):
try:
with open(path, errors="replace") as f:
return f.readlines()[-n:]
except Exception:
return []
def scan_rate_limit():
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
LAST drain outcome (after the last 'completed' line)."""
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
last_429, last_ok, reset_dt = -1, -1, None
for i, ln in enumerate(lines):
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
last_429 = i
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
if m:
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
if ap == "pm" and hh != 12:
hh += 12
if ap == "am" and hh == 12:
hh = 0
now = _now_utc()
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if cand <= now - timedelta(minutes=1):
cand += timedelta(days=1)
reset_dt = cand
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
last_ok = i
return (last_429 > last_ok and last_429 >= 0), reset_dt
# ── actions ───────────────────────────────────────────────────────────────────
def trigger_drain(win_start, win_end):
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
capture_output=True, text=True, env=env, timeout=60)
return r.returncode == 0, (r.stderr or r.stdout)[:200]
def stop_drain():
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
return r.returncode == 0
# ── the tick ──────────────────────────────────────────────────────────────────
def tick():
now = _now_utc()
prev = load_state()
notes = []
try:
snap = db_snapshot()
except Exception as e:
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
print("⚠️ כשל בקריאת ה-DB:", e)
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
return
# /operations "disabled" switch — highest priority. The drain self-guards at
# startup, but a process mid-run wouldn't notice the flag, and the cron keeps
# firing it; so when disabled we stop any running drain and never re-trigger,
# regardless of burst/window. (The UI toggle also stops it immediately via the
# bridge; this is the backstop + the "don't re-ignite a disabled drain" gate.)
if snap.get("disabled"):
stopped = stop_drain()
save_state({**prev, "tick_at": now.isoformat(), "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped"})
print(f"🛑 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: disabled | "
f"מסומן לא-פעיל ב-/operations — הדריינר {'נעצר' if stopped else 'כבר עצור'}.")
print("JSON:" + json.dumps(
{"ok": True, "mode": "disabled",
"action": "stopped-disabled" if stopped else "already-stopped",
"next_wake_sec": 900}, ensure_ascii=False))
return
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
if burst_until and now >= burst_until:
try:
db_set_burst(None)
except Exception as e:
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
stop_drain()
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
burst_until = None
burst = bool(burst_until and now < burst_until)
idt_hour = now.astimezone(IDT).hour
night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
window_open = burst or night_open
win = (0, 0) if burst else (NIGHT_START, NIGHT_END)
phase = "burst" if burst else "night"
sc = snap["status_counts"]
pending = int(sc.get("pending", 0))
processing = int(sc.get("processing", 0))
done = int(sc.get("completed", 0))
failed = int(sc.get("failed", 0))
hal = snap["halachot_total"]
ck = snap["checkpointed_chunks"]
proc_cases = snap["processing_cases"]
status = pm2_status()
age = log_age_sec()
rl_recent, reset_dt = scan_rate_limit()
d_hal = hal - prev.get("halachot_total", hal)
d_ck = ck - prev.get("checkpointed_chunks", ck)
d_done = done - prev.get("done", done)
# cooldown — fresh-gated; honor a stored future reset
fresh = (age is not None and age < 1800)
rl_active = bool(rl_recent and fresh)
cd_dt = None
if rl_active and reset_dt:
cd_dt = reset_dt
elif prev.get("cooldown_until"):
try:
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
except Exception:
cd_dt = None
in_cooldown = bool(cd_dt and now < cd_dt)
# Don't trust the reported reset time — re-probe. claude.ai usually frees up
# quota EARLIER than the 429 message claims, and the old code then sat idle
# until that (conservative) timestamp. When we'd otherwise hold, a tiny live
# probe lets us resume the instant quota is actually back (≤ one tick), no
# manual kick. Runs at most once per tick and only while we think we're
# limited, so the cost is negligible.
if in_cooldown and quota_available():
notes.append(
f"בדיקת-מכסה הצליחה — המכסה חזרה לפני האיפוס המדווח "
f"({cd_dt.astimezone(IDT):%H:%M IDT}); מתחדש מיד.")
cd_dt = None
in_cooldown = False
cooldown_until = cd_dt.isoformat() if cd_dt else None
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
# progress-based liveness (chunk checkpoints, NOT log mtime)
if ck > prev.get("checkpointed_chunks", ck):
last_progress_at = now
else:
lp = prev.get("last_progress_at")
try:
last_progress_at = datetime.fromisoformat(lp) if lp else now
except Exception:
last_progress_at = now
progress_age = (now - last_progress_at).total_seconds()
hung = (status == "online" and prev.get("last_progress_at") is not None
and progress_age > STUCK_SILENCE_SEC)
# ── decide ONE action ──
action, detail, mode = "none", "", "running"
if pending == 0 and processing == 0:
mode = "done"
notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited"
action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif hung:
mode = "hung"
ok, detail = trigger_drain(*win)
action = "restart-hung" if ok else "restart-hung-failed"
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
elif status == "online":
mode = "running"
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
elif not window_open:
mode = "idle_off_window"
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:000{NIGHT_END}:00) — ממתין; {pending} בתור.")
else:
ok, detail = trigger_drain(*win)
action = "triggered" if ok else "trigger-failed"
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
staging_ok = not (d_done > 0 and d_ck == 0)
if not staging_ok:
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
elif mode in ("done", "idle_off_window"):
next_wake = 1800
else:
next_wake = 900
save_state({
"tick_at": now.isoformat(), "phase": phase,
"burst_until": burst_until.isoformat() if burst_until else None,
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
"pending": pending, "processing": processing, "processing_cases": proc_cases,
"last_progress_at": last_progress_at.isoformat(),
"cooldown_until": cooldown_until if in_cooldown else None,
"mode": mode, "action": action,
})
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
if prev.get("tick_at") else "")
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
if proc_cases:
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
for n in notes:
print(f"{n}")
print("JSON:" + json.dumps({
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
"burst_until": burst_until.isoformat() if burst_until else None,
"pending": pending, "processing": processing, "done": done, "failed": failed,
"halachot_total": hal, "checkpointed_chunks": ck,
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
"pm2": status, "progress_age_sec": int(progress_age),
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
}, ensure_ascii=False))
def cmd_status():
bu = db_get_burst()
now = _now_utc()
burst = bool(bu and now < bu)
st = load_state()
print(f"מצב: {'BURST' if burst else 'night'}"
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:0005:00)"))
if st:
print(f"טיק אחרון: {st.get('tick_at','')} | mode={st.get('mode')} action={st.get('action')}")
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
print(f"דריינר pm2: {pm2_status()}")
usage = subscription_usage()
if usage:
def _w(key):
w = usage.get(key) or {}
u = w.get("utilization")
if u is None:
return ""
r = w.get("resets_at")
try:
rt = f" (איפוס {datetime.fromisoformat(r).astimezone(IDT):%H:%M}" if r else ""
rt += ")" if r else ""
except Exception:
rt = ""
return f"{u:.0f}%{rt}"
print(f"מכסת claude.ai: 5-שעות={_w('five_hour')} · שבועי={_w('seven_day')} · "
f"שבועי-Sonnet={_w('seven_day_sonnet')}")
else:
print("מכסת claude.ai: (endpoint לא זמין)")
def main():
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
ap.add_argument("cmd", nargs="?", default="tick",
choices=["tick", "burst-on", "burst-off", "status"])
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
a = ap.parse_args()
if a.cmd == "burst-on":
if a.until:
try:
until = datetime.fromisoformat(a.until)
except ValueError:
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
if until.tzinfo is None:
until = until.replace(tzinfo=IDT)
else:
until = next_saturday_18_utc()
db_set_burst(until.isoformat())
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
tick() # kick off immediately
elif a.cmd == "burst-off":
db_set_burst(None)
stopped = stop_drain()
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
f"חזרה למצב לילה (23:0005:00).")
tick()
elif a.cmd == "status":
cmd_status()
else:
tick()
if __name__ == "__main__":
main()