feat(operations): manual burst control for the halacha drain + permanent supervisor (backend) #235
@@ -1501,6 +1501,16 @@ SCHEMA_V36_SQL = """
|
||||
ALTER TABLE draft_final_pairs ADD COLUMN IF NOT EXISTS learning_run JSONB;
|
||||
"""
|
||||
|
||||
SCHEMA_V37_SQL = """
|
||||
-- drain_controls.burst_until: a MANUAL, time-boxed "run continuously now" window
|
||||
-- for a drain, managed interactively from /operations. While burst_until is in the
|
||||
-- future the host supervisor (legal-halacha-supervisor) lifts the drain's
|
||||
-- night-window and keeps it draining; NULL/past = the drain's normal schedule.
|
||||
-- Chair-controlled — never set automatically. CAPTURE field on the existing
|
||||
-- control table (G2 — no parallel control path).
|
||||
ALTER TABLE drain_controls ADD COLUMN IF NOT EXISTS burst_until TIMESTAMPTZ;
|
||||
"""
|
||||
|
||||
|
||||
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
||||
async with pool.acquire() as conn:
|
||||
@@ -1541,7 +1551,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
||||
await conn.execute(SCHEMA_V34_SQL)
|
||||
await conn.execute(SCHEMA_V35_SQL)
|
||||
await conn.execute(SCHEMA_V36_SQL)
|
||||
logger.info("Database schema initialized (v1-v36)")
|
||||
await conn.execute(SCHEMA_V37_SQL)
|
||||
logger.info("Database schema initialized (v1-v37)")
|
||||
|
||||
|
||||
async def init_schema() -> None:
|
||||
@@ -6908,3 +6919,37 @@ async def get_drain_controls() -> dict[str, bool]:
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch("SELECT name, disabled FROM drain_controls")
|
||||
return {r["name"]: bool(r["disabled"]) for r in rows}
|
||||
|
||||
|
||||
async def set_drain_burst(name: str, until) -> None:
|
||||
"""Set/clear a drain's MANUAL burst window (upsert). ``until=None`` clears it.
|
||||
|
||||
Single source of truth shared by the container-side /operations API and the
|
||||
host-side supervisor (G1) — no parallel control path (G2)."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"INSERT INTO drain_controls (name, burst_until, updated_at) "
|
||||
"VALUES ($1, $2, now()) "
|
||||
"ON CONFLICT (name) DO UPDATE SET burst_until = $2, updated_at = now()",
|
||||
name, until,
|
||||
)
|
||||
|
||||
|
||||
async def get_drain_burst(name: str):
|
||||
"""The drain's burst_until (datetime) or None."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
return await conn.fetchval(
|
||||
"SELECT burst_until FROM drain_controls WHERE name = $1", name
|
||||
)
|
||||
|
||||
|
||||
async def get_drain_bursts() -> dict[str, str]:
|
||||
"""Map of drain name → burst_until ISO string (only rows with a value set)."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"SELECT name, burst_until FROM drain_controls WHERE burst_until IS NOT NULL"
|
||||
)
|
||||
return {r["name"]: r["burst_until"].isoformat() for r in rows}
|
||||
|
||||
@@ -110,6 +110,8 @@
|
||||
| `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) |
|
||||
| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני |
|
||||
| `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `0 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:00–05:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · backoff ב-rate-limit (429 + parse איפוס, מגודר-טריות; `cost=0`=מנוי) · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations |
|
||||
| `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `*/15 * * * *`, `HALACHA_SUPERVISOR_CRON` לעקיפה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) |
|
||||
| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` |
|
||||
| `legal-digest-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_digests.py`** (cron `0 */2 * * *`, `DIGEST_DRAIN_CRON` לעקיפה) — הועבר מ-crontab של המערכת ל-pm2 כדי שיופיע ויהיה שליט בדף `/operations` (הרץ-עכשיו/הפעל/כבה). `autorestart:false` (one-shot per tick). דורש claude CLI + `VOYAGE_API_KEY`. התקנה: `pm2 start scripts/legal-digest-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
|
||||
421
scripts/halacha_drain_supervisor.py
Normal file
421
scripts/halacha_drain_supervisor.py
Normal file
@@ -0,0 +1,421 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain.
|
||||
|
||||
Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked
|
||||
reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just
|
||||
reads DB/logs/pm2 and pokes the drain via the established run-now mechanism.
|
||||
|
||||
Subcommands (argparse):
|
||||
tick (default — what the pm2 cron fires every 15 min) run ONE health tick
|
||||
burst-on start a daytime BURST — drain runs continuously until the upcoming
|
||||
Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately.
|
||||
burst-off stop the burst now (revert to night mode) and halt the running drain
|
||||
status print current mode + queue + drain state (no action)
|
||||
|
||||
BURST is the chair's manual "run continuously now" window. Source of truth is the
|
||||
DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value
|
||||
the /operations page reads/writes (G1, single source; G2, no parallel control
|
||||
path). burst-on/off here and the /operations toggle are equivalent front-ends.
|
||||
|
||||
One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status,
|
||||
log tails →
|
||||
• re-triggers the one-shot drain when idle and the queue is non-empty
|
||||
• restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the
|
||||
REAL liveness signal; the out-log only updates when a whole CASE finishes)
|
||||
• backs off on rate-limit (claude_session 429) until the CLI's parsed reset;
|
||||
a 429 only counts when the log is FRESH (an hours-old 429 is historical)
|
||||
• verifies crash-safe per-chunk staging is committing (nothing lost)
|
||||
|
||||
TWO MODES (never self-stops):
|
||||
• burst — while burst_until is set and in the future: window LIFTED
|
||||
(HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day.
|
||||
• night — otherwise: the drain's normal 23:00–05:00 IDT window; the supervisor
|
||||
stays on as the permanent nightly-drain health manager.
|
||||
Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it
|
||||
never bleeds into a fresh week's quota uninvited.
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from glob import glob
|
||||
|
||||
REPO = "/home/chaim/legal-ai"
|
||||
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
|
||||
STATE = os.path.join(RUNTIME_DIR, "state.json")
|
||||
DRAIN = "legal-halacha-drain"
|
||||
OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log"
|
||||
ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log"
|
||||
VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python")
|
||||
|
||||
STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung
|
||||
WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h
|
||||
IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only
|
||||
NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
|
||||
|
||||
|
||||
def _now_utc():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def pm2_bin():
|
||||
for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]:
|
||||
try:
|
||||
subprocess.run([c, "-v"], capture_output=True, timeout=10)
|
||||
return c
|
||||
except Exception:
|
||||
continue
|
||||
return "pm2"
|
||||
|
||||
|
||||
PM2 = pm2_bin()
|
||||
_ENV = {**os.environ, "HOME": "/home/chaim"}
|
||||
|
||||
|
||||
# ── DB access (via the repo venv; the module self-configures) ────────────────
|
||||
def _venv_py(code: str, timeout: int = 120) -> str:
|
||||
r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True,
|
||||
cwd=REPO, timeout=timeout, env=_ENV)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError((r.stderr or r.stdout)[:300])
|
||||
return r.stdout.strip()
|
||||
|
||||
|
||||
def db_snapshot() -> dict:
|
||||
"""Queue + staging counts + burst_until, in one round-trip."""
|
||||
code = (
|
||||
"import sys,os,asyncio,json\n"
|
||||
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
|
||||
"from legal_mcp.services import db\n"
|
||||
"async def m():\n"
|
||||
" pool=await db.get_pool()\n"
|
||||
" async with pool.acquire() as c:\n"
|
||||
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
|
||||
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
|
||||
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
|
||||
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
|
||||
" pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n"
|
||||
" bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n"
|
||||
" print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None}))\n"
|
||||
"asyncio.run(m())\n"
|
||||
)
|
||||
return json.loads(_venv_py(code).splitlines()[-1])
|
||||
|
||||
|
||||
def db_set_burst(until_iso):
|
||||
code = (
|
||||
"import sys,os,asyncio\n"
|
||||
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
|
||||
"from datetime import datetime\n"
|
||||
"from legal_mcp.services import db\n"
|
||||
f"v={until_iso!r}\n"
|
||||
"async def m():\n"
|
||||
" await db.set_drain_burst('legal-halacha-drain', datetime.fromisoformat(v) if v else None)\n"
|
||||
"asyncio.run(m())\n"
|
||||
)
|
||||
_venv_py(code, timeout=60)
|
||||
|
||||
|
||||
def db_get_burst():
|
||||
code = (
|
||||
"import sys,os,asyncio\n"
|
||||
"sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n"
|
||||
"from legal_mcp.services import db\n"
|
||||
"async def m():\n"
|
||||
" v=await db.get_drain_burst('legal-halacha-drain')\n"
|
||||
" print(v.isoformat() if v else '')\n"
|
||||
"asyncio.run(m())\n"
|
||||
)
|
||||
out = _venv_py(code, timeout=60).strip()
|
||||
return datetime.fromisoformat(out) if out else None
|
||||
|
||||
|
||||
def next_saturday_18_utc():
|
||||
now_idt = _now_utc().astimezone(IDT)
|
||||
days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6
|
||||
cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
|
||||
if cand <= now_idt:
|
||||
cand += timedelta(days=7)
|
||||
return cand.astimezone(timezone.utc)
|
||||
|
||||
|
||||
# ── state ────────────────────────────────────────────────────────────────────
|
||||
def load_state():
|
||||
try:
|
||||
with open(STATE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def save_state(s):
|
||||
os.makedirs(RUNTIME_DIR, exist_ok=True)
|
||||
tmp = STATE + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(s, f, ensure_ascii=False, indent=2)
|
||||
os.replace(tmp, STATE)
|
||||
|
||||
|
||||
# ── pm2 / logs ────────────────────────────────────────────────────────────────
|
||||
def pm2_status():
|
||||
try:
|
||||
r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30)
|
||||
for p in json.loads(r.stdout):
|
||||
if p.get("name") == DRAIN:
|
||||
return p.get("pm2_env", {}).get("status", "unknown")
|
||||
except Exception as e:
|
||||
return f"err:{e}"
|
||||
return "absent"
|
||||
|
||||
|
||||
def log_age_sec():
|
||||
try:
|
||||
return _now_utc().timestamp() - os.path.getmtime(OUT_LOG)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def tail(path, n=120):
|
||||
try:
|
||||
with open(path, errors="replace") as f:
|
||||
return f.readlines()[-n:]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def scan_rate_limit():
|
||||
"""(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the
|
||||
LAST drain outcome (after the last 'completed' line)."""
|
||||
lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120)
|
||||
last_429, last_ok, reset_dt = -1, -1, None
|
||||
for i, ln in enumerate(lines):
|
||||
if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower():
|
||||
last_429 = i
|
||||
m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I)
|
||||
if m:
|
||||
hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower()
|
||||
if ap == "pm" and hh != 12:
|
||||
hh += 12
|
||||
if ap == "am" and hh == 12:
|
||||
hh = 0
|
||||
now = _now_utc()
|
||||
cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
|
||||
if cand <= now - timedelta(minutes=1):
|
||||
cand += timedelta(days=1)
|
||||
reset_dt = cand
|
||||
if "completed stored=" in ln or ("[round" in ln and "processed=" in ln):
|
||||
last_ok = i
|
||||
return (last_429 > last_ok and last_429 >= 0), reset_dt
|
||||
|
||||
|
||||
# ── actions ───────────────────────────────────────────────────────────────────
|
||||
def trigger_drain(win_start, win_end):
|
||||
"""Restart the one-shot drain, pushing its window env (idempotent, reboot-safe)."""
|
||||
env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start),
|
||||
"HALACHA_DRAIN_WINDOW_END": str(win_end)}
|
||||
r = subprocess.run([PM2, "restart", DRAIN, "--update-env"],
|
||||
capture_output=True, text=True, env=env, timeout=60)
|
||||
return r.returncode == 0, (r.stderr or r.stdout)[:200]
|
||||
|
||||
|
||||
def stop_drain():
|
||||
r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60)
|
||||
return r.returncode == 0
|
||||
|
||||
|
||||
# ── the tick ──────────────────────────────────────────────────────────────────
|
||||
def tick():
|
||||
now = _now_utc()
|
||||
prev = load_state()
|
||||
notes = []
|
||||
|
||||
try:
|
||||
snap = db_snapshot()
|
||||
except Exception as e:
|
||||
save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"})
|
||||
print("⚠️ כשל בקריאת ה-DB:", e)
|
||||
print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none",
|
||||
"mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False))
|
||||
return
|
||||
|
||||
# burst state from the DB + auto-expiry (manual on; auto-off at the deadline)
|
||||
burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None
|
||||
if burst_until and now >= burst_until:
|
||||
try:
|
||||
db_set_burst(None)
|
||||
except Exception as e:
|
||||
notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})")
|
||||
stop_drain()
|
||||
notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.")
|
||||
burst_until = None
|
||||
burst = bool(burst_until and now < burst_until)
|
||||
|
||||
idt_hour = now.astimezone(IDT).hour
|
||||
night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END)
|
||||
window_open = burst or night_open
|
||||
win = (0, 0) if burst else (NIGHT_START, NIGHT_END)
|
||||
phase = "burst" if burst else "night"
|
||||
|
||||
sc = snap["status_counts"]
|
||||
pending = int(sc.get("pending", 0))
|
||||
processing = int(sc.get("processing", 0))
|
||||
done = int(sc.get("completed", 0))
|
||||
failed = int(sc.get("failed", 0))
|
||||
hal = snap["halachot_total"]
|
||||
ck = snap["checkpointed_chunks"]
|
||||
proc_cases = snap["processing_cases"]
|
||||
|
||||
status = pm2_status()
|
||||
age = log_age_sec()
|
||||
rl_recent, reset_dt = scan_rate_limit()
|
||||
|
||||
d_hal = hal - prev.get("halachot_total", hal)
|
||||
d_ck = ck - prev.get("checkpointed_chunks", ck)
|
||||
d_done = done - prev.get("done", done)
|
||||
|
||||
# cooldown — fresh-gated; honor a stored future reset
|
||||
fresh = (age is not None and age < 1800)
|
||||
rl_active = bool(rl_recent and fresh)
|
||||
cd_dt = None
|
||||
if rl_active and reset_dt:
|
||||
cd_dt = reset_dt
|
||||
elif prev.get("cooldown_until"):
|
||||
try:
|
||||
cd_dt = datetime.fromisoformat(prev["cooldown_until"])
|
||||
except Exception:
|
||||
cd_dt = None
|
||||
cooldown_until = cd_dt.isoformat() if cd_dt else None
|
||||
in_cooldown = bool(cd_dt and now < cd_dt)
|
||||
weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS))
|
||||
|
||||
# progress-based liveness (chunk checkpoints, NOT log mtime)
|
||||
if ck > prev.get("checkpointed_chunks", ck):
|
||||
last_progress_at = now
|
||||
else:
|
||||
lp = prev.get("last_progress_at")
|
||||
try:
|
||||
last_progress_at = datetime.fromisoformat(lp) if lp else now
|
||||
except Exception:
|
||||
last_progress_at = now
|
||||
progress_age = (now - last_progress_at).total_seconds()
|
||||
hung = (status == "online" and prev.get("last_progress_at") is not None
|
||||
and progress_age > STUCK_SILENCE_SEC)
|
||||
|
||||
# ── decide ONE action ──
|
||||
action, detail, mode = "none", "", "running"
|
||||
if pending == 0 and processing == 0:
|
||||
mode = "done"
|
||||
notes.append("התור ריק — אין מה לחלץ.")
|
||||
elif in_cooldown:
|
||||
mode = "weekly_exhausted" if weekly else "ratelimited"
|
||||
action = "hold"
|
||||
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
|
||||
elif hung:
|
||||
mode = "hung"
|
||||
ok, detail = trigger_drain(*win)
|
||||
action = "restart-hung" if ok else "restart-hung-failed"
|
||||
notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.")
|
||||
elif status == "online":
|
||||
mode = "running"
|
||||
notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').")
|
||||
elif not window_open:
|
||||
mode = "idle_off_window"
|
||||
notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:00–0{NIGHT_END}:00) — ממתין; {pending} בתור.")
|
||||
else:
|
||||
ok, detail = trigger_drain(*win)
|
||||
action = "triggered" if ok else "trigger-failed"
|
||||
notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}")
|
||||
|
||||
staging_ok = not (d_done > 0 and d_ck == 0)
|
||||
if not staging_ok:
|
||||
notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.")
|
||||
|
||||
if mode in ("ratelimited", "weekly_exhausted") and cd_dt:
|
||||
next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600))
|
||||
elif mode in ("done", "idle_off_window"):
|
||||
next_wake = 1800
|
||||
else:
|
||||
next_wake = 900
|
||||
|
||||
save_state({
|
||||
"tick_at": now.isoformat(), "phase": phase,
|
||||
"burst_until": burst_until.isoformat() if burst_until else None,
|
||||
"halachot_total": hal, "checkpointed_chunks": ck, "done": done,
|
||||
"pending": pending, "processing": processing, "processing_cases": proc_cases,
|
||||
"last_progress_at": last_progress_at.isoformat(),
|
||||
"cooldown_until": cooldown_until if in_cooldown else None,
|
||||
"mode": mode, "action": action,
|
||||
})
|
||||
|
||||
elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)"
|
||||
if prev.get("tick_at") else "")
|
||||
bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else ""
|
||||
print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}")
|
||||
print(f" תור: pending={pending} processing={processing} done={done} failed={failed}")
|
||||
print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}")
|
||||
print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}")
|
||||
if proc_cases:
|
||||
print(f" בעיבוד עכשיו: {', '.join(proc_cases)}")
|
||||
for n in notes:
|
||||
print(f" • {n}")
|
||||
print("JSON:" + json.dumps({
|
||||
"ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake,
|
||||
"burst_until": burst_until.isoformat() if burst_until else None,
|
||||
"pending": pending, "processing": processing, "done": done, "failed": failed,
|
||||
"halachot_total": hal, "checkpointed_chunks": ck,
|
||||
"delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done,
|
||||
"pm2": status, "progress_age_sec": int(progress_age),
|
||||
"staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until,
|
||||
}, ensure_ascii=False))
|
||||
|
||||
|
||||
def cmd_status():
|
||||
bu = db_get_burst()
|
||||
now = _now_utc()
|
||||
burst = bool(bu and now < bu)
|
||||
st = load_state()
|
||||
print(f"מצב: {'BURST' if burst else 'night'}"
|
||||
+ (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:00–05:00)"))
|
||||
if st:
|
||||
print(f"טיק אחרון: {st.get('tick_at','—')} | mode={st.get('mode')} action={st.get('action')}")
|
||||
print(f"תור: pending={st.get('pending')} processing={st.get('processing')} "
|
||||
f"done={st.get('done')} | staging halachot={st.get('halachot_total')}")
|
||||
print(f"דריינר pm2: {pm2_status()}")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="halacha-drain supervisor")
|
||||
ap.add_argument("cmd", nargs="?", default="tick",
|
||||
choices=["tick", "burst-on", "burst-off", "status"])
|
||||
ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)")
|
||||
a = ap.parse_args()
|
||||
|
||||
if a.cmd == "burst-on":
|
||||
if a.until:
|
||||
try:
|
||||
until = datetime.fromisoformat(a.until)
|
||||
except ValueError:
|
||||
until = datetime.strptime(a.until, "%Y-%m-%d %H:%M")
|
||||
if until.tzinfo is None:
|
||||
until = until.replace(tzinfo=IDT)
|
||||
else:
|
||||
until = next_saturday_18_utc()
|
||||
db_set_burst(until.isoformat())
|
||||
print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).")
|
||||
tick() # kick off immediately
|
||||
elif a.cmd == "burst-off":
|
||||
db_set_burst(None)
|
||||
stopped = stop_drain()
|
||||
print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; "
|
||||
f"חזרה למצב לילה (23:00–05:00).")
|
||||
tick()
|
||||
elif a.cmd == "status":
|
||||
cmd_status()
|
||||
else:
|
||||
tick()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
45
scripts/legal-halacha-supervisor.config.cjs
Normal file
45
scripts/legal-halacha-supervisor.config.cjs
Normal file
@@ -0,0 +1,45 @@
|
||||
/**
|
||||
* pm2 ecosystem entry for legal-halacha-supervisor — the permanent health
|
||||
* manager for the halacha-extraction drain (legal-halacha-drain).
|
||||
*
|
||||
* Runs ONE health tick per cron fire (~every 15 min) and takes ZERO Claude quota
|
||||
* itself (only the drain calls Opus); it just reads the DB/logs/pm2 and pokes the
|
||||
* existing drain via the established run-now mechanism. Each tick:
|
||||
* • re-triggers the one-shot drain when idle and the queue is non-empty
|
||||
* • restarts a HUNG run (online but no new chunk-checkpoint > 25 min — the
|
||||
* out-log only updates when a whole CASE finishes, so mtime is not liveness)
|
||||
* • backs off on rate-limit (claude_session 429) until the CLI's parsed reset
|
||||
* • verifies crash-safe per-chunk staging is committing (nothing lost)
|
||||
*
|
||||
* BURST (manual "run continuously now" window): source of truth is
|
||||
* drain_controls.burst_until in the DB — the SAME value the /operations page
|
||||
* reads/writes (G1 single source; G2 no parallel control path). While it is in
|
||||
* the future the supervisor LIFTS the drain's night-window; otherwise the drain
|
||||
* keeps its normal 23:00–05:00 IDT window. Burst auto-expires at its deadline.
|
||||
* Manual front-ends: the /operations toggle, or:
|
||||
* mcp-server/.venv/bin/python scripts/halacha_drain_supervisor.py burst-on
|
||||
* mcp-server/.venv/bin/python scripts/halacha_drain_supervisor.py burst-off
|
||||
*
|
||||
* Pattern (mirrors legal-halacha-drain): cron_restart fires the script;
|
||||
* autorestart:false → one-shot per tick (pm2 shows "stopped" between ticks).
|
||||
*
|
||||
* Install (once):
|
||||
* pm2 start /home/chaim/legal-ai/scripts/legal-halacha-supervisor.config.cjs
|
||||
* pm2 save
|
||||
*/
|
||||
const cron = process.env.HALACHA_SUPERVISOR_CRON || "*/15 * * * *";
|
||||
|
||||
module.exports = {
|
||||
apps: [
|
||||
{
|
||||
name: "legal-halacha-supervisor",
|
||||
cwd: "/home/chaim/legal-ai",
|
||||
script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python",
|
||||
args: "scripts/halacha_drain_supervisor.py",
|
||||
env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" },
|
||||
autorestart: false, // one-shot per cron tick
|
||||
cron_restart: cron,
|
||||
max_memory_restart: "300M",
|
||||
},
|
||||
],
|
||||
};
|
||||
49
web/app.py
49
web/app.py
@@ -6637,8 +6637,10 @@ async def operations_snapshot():
|
||||
|
||||
pm2 = await _ops_pm2_services()
|
||||
controls = await db.get_drain_controls()
|
||||
bursts = await db.get_drain_bursts()
|
||||
for svc in pm2["services"]:
|
||||
svc["disabled"] = controls.get(svc.get("name", ""), False)
|
||||
svc["burst_until"] = bursts.get(svc.get("name", ""))
|
||||
|
||||
def _iso(rows: list[dict]) -> list[dict]:
|
||||
for d in rows:
|
||||
@@ -6717,6 +6719,53 @@ async def operations_drain_toggle(name: str, body: dict = Body(...)):
|
||||
return {"ok": True, "name": name, "disabled": disabled}
|
||||
|
||||
|
||||
def _next_saturday_18_il() -> datetime:
|
||||
"""Upcoming Saturday 18:00 Israel time (DST-safe)."""
|
||||
from datetime import timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
il = ZoneInfo("Asia/Jerusalem")
|
||||
now = datetime.now(il)
|
||||
days = (5 - now.weekday()) % 7 # Mon=0 .. Sat=5 .. Sun=6
|
||||
cand = now.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days)
|
||||
if cand <= now:
|
||||
cand += timedelta(days=7)
|
||||
return cand
|
||||
|
||||
|
||||
@app.post("/api/operations/drains/{name}/burst")
|
||||
async def operations_drain_burst(name: str, body: dict = Body(...)):
|
||||
"""Start/stop a drain's MANUAL burst window (chair-controlled, from /operations).
|
||||
|
||||
``action='on'`` → ``burst_until`` = body ``until`` (ISO) or the upcoming
|
||||
Saturday 18:00 Israel time. ``action='off'`` → NULL. The host supervisor
|
||||
(legal-halacha-supervisor) reads this from the DB and lifts/restores the
|
||||
drain's window accordingly (takes effect within one supervisor tick, ≤15 min).
|
||||
Never set automatically — manual only."""
|
||||
if not name.startswith("legal-"):
|
||||
raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*")
|
||||
action = (body.get("action") or "").lower()
|
||||
if action == "off":
|
||||
await db.set_drain_burst(name, None)
|
||||
return {"ok": True, "name": name, "burst_until": None}
|
||||
if action == "on":
|
||||
until = body.get("until")
|
||||
if until:
|
||||
try:
|
||||
until_dt = datetime.fromisoformat(until)
|
||||
except (ValueError, TypeError):
|
||||
raise HTTPException(400, "until חייב להיות ISO-8601")
|
||||
if until_dt.tzinfo is None:
|
||||
from zoneinfo import ZoneInfo
|
||||
until_dt = until_dt.replace(tzinfo=ZoneInfo("Asia/Jerusalem"))
|
||||
else:
|
||||
until_dt = _next_saturday_18_il()
|
||||
if until_dt <= datetime.now(timezone.utc):
|
||||
raise HTTPException(400, "until חייב להיות בעתיד")
|
||||
await db.set_drain_burst(name, until_dt)
|
||||
return {"ok": True, "name": name, "burst_until": until_dt.isoformat()}
|
||||
raise HTTPException(400, "action חייב להיות on|off")
|
||||
|
||||
|
||||
# ── Live agents (/operations "סוכנים פעילים") ──────────────────────────────
|
||||
# What the pm2/queue panels can't show: WHICH agent is doing the work right now
|
||||
# and its live output. An agent-driven drain (e.g. the CEO heartbeat draining
|
||||
|
||||
Reference in New Issue
Block a user