From c7c402e7efa3b1d2fa144cd1187486d251f0ad7a Mon Sep 17 00:00:00 2001 From: Chaim Date: Fri, 12 Jun 2026 11:11:13 +0000 Subject: [PATCH] feat(operations): manual burst control for the halacha drain + permanent supervisor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The halacha-extraction backlog needs to be worked off the chair's leftover weekly Claude quota on demand. This adds a MANUAL, time-boxed "burst" — run the drain continuously now until a chosen deadline (default the upcoming Saturday 18:00 IL), managed interactively from /operations — plus the permanent health-supervisor that enforces it. Backend (this PR; deploys via Coolify + host pm2): - db: drain_controls.burst_until (SCHEMA_V37) + set_drain_burst/get_drain_burst/ get_drain_bursts. Single source of truth shared by the container-side /operations API and the host-side supervisor. - web: POST /api/operations/drains/{name}/burst (on→until|next-Sat-18:00, off→NULL), and burst_until surfaced per-service in the /operations snapshot. - scripts/halacha_drain_supervisor.py + legal-halacha-supervisor.config.cjs: pm2 cron (*/15, zero Claude quota) — re-triggers idle drain, restarts a HUNG run (liveness = per-chunk checkpoints, NOT log mtime), backs off on 429 until the parsed reset (fresh-gated), verifies crash-safe staging. Reads burst_until from the DB; burst auto-expires at the deadline (never bleeds into a fresh week). UI (separate follow-up PR, after Claude Design approval): the /operations toggle + date-picker that calls the burst endpoint. Invariants: G1 (normalize at source — burst lives once in the DB, read by both surfaces), G2 (no parallel control path — CAPTURE field on the existing drain_controls + orchestrates the existing drain, not a new one), G12 (no Paperclip touch), §6 (no silent error-swallow — burst-clear failure is surfaced as a note). --- mcp-server/src/legal_mcp/services/db.py | 47 ++- scripts/SCRIPTS.md | 2 + scripts/halacha_drain_supervisor.py | 421 ++++++++++++++++++++ scripts/legal-halacha-supervisor.config.cjs | 45 +++ web/app.py | 49 +++ 5 files changed, 563 insertions(+), 1 deletion(-) create mode 100644 scripts/halacha_drain_supervisor.py create mode 100644 scripts/legal-halacha-supervisor.config.cjs diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index b15fbbe..3848a7b 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -1501,6 +1501,16 @@ SCHEMA_V36_SQL = """ ALTER TABLE draft_final_pairs ADD COLUMN IF NOT EXISTS learning_run JSONB; """ +SCHEMA_V37_SQL = """ +-- drain_controls.burst_until: a MANUAL, time-boxed "run continuously now" window +-- for a drain, managed interactively from /operations. While burst_until is in the +-- future the host supervisor (legal-halacha-supervisor) lifts the drain's +-- night-window and keeps it draining; NULL/past = the drain's normal schedule. +-- Chair-controlled — never set automatically. CAPTURE field on the existing +-- control table (G2 — no parallel control path). +ALTER TABLE drain_controls ADD COLUMN IF NOT EXISTS burst_until TIMESTAMPTZ; +""" + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: @@ -1541,7 +1551,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V34_SQL) await conn.execute(SCHEMA_V35_SQL) await conn.execute(SCHEMA_V36_SQL) - logger.info("Database schema initialized (v1-v36)") + await conn.execute(SCHEMA_V37_SQL) + logger.info("Database schema initialized (v1-v37)") async def init_schema() -> None: @@ -6908,3 +6919,37 @@ async def get_drain_controls() -> dict[str, bool]: async with pool.acquire() as conn: rows = await conn.fetch("SELECT name, disabled FROM drain_controls") return {r["name"]: bool(r["disabled"]) for r in rows} + + +async def set_drain_burst(name: str, until) -> None: + """Set/clear a drain's MANUAL burst window (upsert). ``until=None`` clears it. + + Single source of truth shared by the container-side /operations API and the + host-side supervisor (G1) — no parallel control path (G2).""" + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute( + "INSERT INTO drain_controls (name, burst_until, updated_at) " + "VALUES ($1, $2, now()) " + "ON CONFLICT (name) DO UPDATE SET burst_until = $2, updated_at = now()", + name, until, + ) + + +async def get_drain_burst(name: str): + """The drain's burst_until (datetime) or None.""" + pool = await get_pool() + async with pool.acquire() as conn: + return await conn.fetchval( + "SELECT burst_until FROM drain_controls WHERE name = $1", name + ) + + +async def get_drain_bursts() -> dict[str, str]: + """Map of drain name → burst_until ISO string (only rows with a value set).""" + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT name, burst_until FROM drain_controls WHERE burst_until IS NOT NULL" + ) + return {r["name"]: r["burst_until"].isoformat() for r in rows} diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 75f9e6d..81c9619 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -110,6 +110,8 @@ | `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) | | `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | | `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `0 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:00–05:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | +| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · backoff ב-rate-limit (429 + parse איפוס, מגודר-טריות; `cost=0`=מנוי) · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations | +| `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `*/15 * * * *`, `HALACHA_SUPERVISOR_CRON` לעקיפה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | | `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | | `legal-digest-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_digests.py`** (cron `0 */2 * * *`, `DIGEST_DRAIN_CRON` לעקיפה) — הועבר מ-crontab של המערכת ל-pm2 כדי שיופיע ויהיה שליט בדף `/operations` (הרץ-עכשיו/הפעל/כבה). `autorestart:false` (one-shot per tick). דורש claude CLI + `VOYAGE_API_KEY`. התקנה: `pm2 start scripts/legal-digest-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | diff --git a/scripts/halacha_drain_supervisor.py b/scripts/halacha_drain_supervisor.py new file mode 100644 index 0000000..9c7e51e --- /dev/null +++ b/scripts/halacha_drain_supervisor.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +"""Halacha-drain supervisor — permanent health manager for legal-halacha-drain. + +Babysits the EXISTING pm2 drain so the halacha-extraction backlog gets worked +reliably. Takes ZERO Claude quota itself (only the drain calls Opus); it just +reads DB/logs/pm2 and pokes the drain via the established run-now mechanism. + +Subcommands (argparse): + tick (default — what the pm2 cron fires every 15 min) run ONE health tick + burst-on start a daytime BURST — drain runs continuously until the upcoming + Saturday 18:00 IDT (or --until). MANUAL ONLY; kicks off immediately. + burst-off stop the burst now (revert to night mode) and halt the running drain + status print current mode + queue + drain state (no action) + +BURST is the chair's manual "run continuously now" window. Source of truth is the +DB (`drain_controls.burst_until`, keyed by the drain's pm2 name) — the SAME value +the /operations page reads/writes (G1, single source; G2, no parallel control +path). burst-on/off here and the /operations toggle are equivalent front-ends. + +One tick: reads queue + per-chunk staging + burst_until from the DB, pm2 status, +log tails → + • re-triggers the one-shot drain when idle and the queue is non-empty + • restarts a HUNG run (online but no new chunk-checkpoint for > 25 min — the + REAL liveness signal; the out-log only updates when a whole CASE finishes) + • backs off on rate-limit (claude_session 429) until the CLI's parsed reset; + a 429 only counts when the log is FRESH (an hours-old 429 is historical) + • verifies crash-safe per-chunk staging is committing (nothing lost) + +TWO MODES (never self-stops): + • burst — while burst_until is set and in the future: window LIFTED + (HALACHA_DRAIN_WINDOW_START==END==0), drain runs all day. + • night — otherwise: the drain's normal 23:00–05:00 IDT window; the supervisor + stays on as the permanent nightly-drain health manager. +Burst auto-EXPIRES at its deadline (DB cleared + running drain stopped) so it +never bleeds into a fresh week's quota uninvited. +""" +import argparse +import json +import os +import re +import subprocess +from datetime import datetime, timedelta, timezone +from glob import glob + +REPO = "/home/chaim/legal-ai" +RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo) +STATE = os.path.join(RUNTIME_DIR, "state.json") +DRAIN = "legal-halacha-drain" +OUT_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-out.log" +ERR_LOG = "/home/chaim/.pm2/logs/legal-halacha-drain-error.log" +VENV_PY = os.path.join(REPO, "mcp-server/.venv/bin/python") + +STUCK_SILENCE_SEC = 1500 # 25 min with no new chunk-checkpoint while online → hung +WEEKLY_GAP_HOURS = 6 # reset further than this → treat as weekly, not 5h +IDT = timezone(timedelta(hours=3)) # Israel summer time (IDT, UTC+3) — display only +NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours) + + +def _now_utc(): + return datetime.now(timezone.utc) + + +def pm2_bin(): + for c in ["pm2", *glob("/home/chaim/.nvm/versions/node/*/bin/pm2")]: + try: + subprocess.run([c, "-v"], capture_output=True, timeout=10) + return c + except Exception: + continue + return "pm2" + + +PM2 = pm2_bin() +_ENV = {**os.environ, "HOME": "/home/chaim"} + + +# ── DB access (via the repo venv; the module self-configures) ──────────────── +def _venv_py(code: str, timeout: int = 120) -> str: + r = subprocess.run([VENV_PY, "-c", code], capture_output=True, text=True, + cwd=REPO, timeout=timeout, env=_ENV) + if r.returncode != 0: + raise RuntimeError((r.stderr or r.stdout)[:300]) + return r.stdout.strip() + + +def db_snapshot() -> dict: + """Queue + staging counts + burst_until, in one round-trip.""" + code = ( + "import sys,os,asyncio,json\n" + "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" + "from legal_mcp.services import db\n" + "async def m():\n" + " pool=await db.get_pool()\n" + " async with pool.acquire() as c:\n" + " st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n" + " procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n" + " hal=await c.fetchval('SELECT count(*) FROM halachot')\n" + " ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n" + " pend_rev=await c.fetchval(\"SELECT count(*) FROM halachot WHERE review_status='pending_review'\")\n" + " bu=await c.fetchval(\"SELECT burst_until FROM drain_controls WHERE name='legal-halacha-drain'\")\n" + " print(json.dumps({'status_counts':st,'processing_cases':procs,'halachot_total':hal,'checkpointed_chunks':ck,'pending_review':pend_rev,'burst_until':bu.isoformat() if bu else None}))\n" + "asyncio.run(m())\n" + ) + return json.loads(_venv_py(code).splitlines()[-1]) + + +def db_set_burst(until_iso): + code = ( + "import sys,os,asyncio\n" + "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" + "from datetime import datetime\n" + "from legal_mcp.services import db\n" + f"v={until_iso!r}\n" + "async def m():\n" + " await db.set_drain_burst('legal-halacha-drain', datetime.fromisoformat(v) if v else None)\n" + "asyncio.run(m())\n" + ) + _venv_py(code, timeout=60) + + +def db_get_burst(): + code = ( + "import sys,os,asyncio\n" + "sys.path.insert(0,'mcp-server/src'); os.environ.setdefault('HOME','/home/chaim')\n" + "from legal_mcp.services import db\n" + "async def m():\n" + " v=await db.get_drain_burst('legal-halacha-drain')\n" + " print(v.isoformat() if v else '')\n" + "asyncio.run(m())\n" + ) + out = _venv_py(code, timeout=60).strip() + return datetime.fromisoformat(out) if out else None + + +def next_saturday_18_utc(): + now_idt = _now_utc().astimezone(IDT) + days = (5 - now_idt.weekday()) % 7 # Mon=0..Sat=5..Sun=6 + cand = now_idt.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days) + if cand <= now_idt: + cand += timedelta(days=7) + return cand.astimezone(timezone.utc) + + +# ── state ──────────────────────────────────────────────────────────────────── +def load_state(): + try: + with open(STATE) as f: + return json.load(f) + except Exception: + return {} + + +def save_state(s): + os.makedirs(RUNTIME_DIR, exist_ok=True) + tmp = STATE + ".tmp" + with open(tmp, "w") as f: + json.dump(s, f, ensure_ascii=False, indent=2) + os.replace(tmp, STATE) + + +# ── pm2 / logs ──────────────────────────────────────────────────────────────── +def pm2_status(): + try: + r = subprocess.run([PM2, "jlist"], capture_output=True, text=True, timeout=30) + for p in json.loads(r.stdout): + if p.get("name") == DRAIN: + return p.get("pm2_env", {}).get("status", "unknown") + except Exception as e: + return f"err:{e}" + return "absent" + + +def log_age_sec(): + try: + return _now_utc().timestamp() - os.path.getmtime(OUT_LOG) + except Exception: + return None + + +def tail(path, n=120): + try: + with open(path, errors="replace") as f: + return f.readlines()[-n:] + except Exception: + return [] + + +def scan_rate_limit(): + """(recent_429, reset_dt_utc|None) — a 429 counts as recent only if it is the + LAST drain outcome (after the last 'completed' line).""" + lines = tail(OUT_LOG, 200) + tail(ERR_LOG, 120) + last_429, last_ok, reset_dt = -1, -1, None + for i, ln in enumerate(lines): + if 'api_error_status":429' in ln or "hit your session limit" in ln or "usage limit" in ln.lower(): + last_429 = i + m = re.search(r"resets\s+(\d{1,2}):(\d{2})\s*([ap]m)\s*\(UTC\)", ln, re.I) + if m: + hh, mm, ap = int(m.group(1)), int(m.group(2)), m.group(3).lower() + if ap == "pm" and hh != 12: + hh += 12 + if ap == "am" and hh == 12: + hh = 0 + now = _now_utc() + cand = now.replace(hour=hh, minute=mm, second=0, microsecond=0) + if cand <= now - timedelta(minutes=1): + cand += timedelta(days=1) + reset_dt = cand + if "completed stored=" in ln or ("[round" in ln and "processed=" in ln): + last_ok = i + return (last_429 > last_ok and last_429 >= 0), reset_dt + + +# ── actions ─────────────────────────────────────────────────────────────────── +def trigger_drain(win_start, win_end): + """Restart the one-shot drain, pushing its window env (idempotent, reboot-safe).""" + env = {**_ENV, "HALACHA_DRAIN_WINDOW_START": str(win_start), + "HALACHA_DRAIN_WINDOW_END": str(win_end)} + r = subprocess.run([PM2, "restart", DRAIN, "--update-env"], + capture_output=True, text=True, env=env, timeout=60) + return r.returncode == 0, (r.stderr or r.stdout)[:200] + + +def stop_drain(): + r = subprocess.run([PM2, "stop", DRAIN], capture_output=True, text=True, timeout=60) + return r.returncode == 0 + + +# ── the tick ────────────────────────────────────────────────────────────────── +def tick(): + now = _now_utc() + prev = load_state() + notes = [] + + try: + snap = db_snapshot() + except Exception as e: + save_state({**prev, "tick_at": now.isoformat(), "mode": "db_error"}) + print("⚠️ כשל בקריאת ה-DB:", e) + print("JSON:" + json.dumps({"ok": False, "error": str(e), "action": "none", + "mode": "db_error", "next_wake_sec": 900}, ensure_ascii=False)) + return + + # burst state from the DB + auto-expiry (manual on; auto-off at the deadline) + burst_until = datetime.fromisoformat(snap["burst_until"]) if snap.get("burst_until") else None + if burst_until and now >= burst_until: + try: + db_set_burst(None) + except Exception as e: + notes.append(f"(אזהרה: ניקוי burst ב-DB נכשל: {e})") + stop_drain() + notes.append(f"ה-burst הסתיים ({burst_until.astimezone(IDT):%a %H:%M IDT}) — חזרה למצב לילה, הדריינר נעצר.") + burst_until = None + burst = bool(burst_until and now < burst_until) + + idt_hour = now.astimezone(IDT).hour + night_open = (idt_hour >= NIGHT_START) or (idt_hour < NIGHT_END) + window_open = burst or night_open + win = (0, 0) if burst else (NIGHT_START, NIGHT_END) + phase = "burst" if burst else "night" + + sc = snap["status_counts"] + pending = int(sc.get("pending", 0)) + processing = int(sc.get("processing", 0)) + done = int(sc.get("completed", 0)) + failed = int(sc.get("failed", 0)) + hal = snap["halachot_total"] + ck = snap["checkpointed_chunks"] + proc_cases = snap["processing_cases"] + + status = pm2_status() + age = log_age_sec() + rl_recent, reset_dt = scan_rate_limit() + + d_hal = hal - prev.get("halachot_total", hal) + d_ck = ck - prev.get("checkpointed_chunks", ck) + d_done = done - prev.get("done", done) + + # cooldown — fresh-gated; honor a stored future reset + fresh = (age is not None and age < 1800) + rl_active = bool(rl_recent and fresh) + cd_dt = None + if rl_active and reset_dt: + cd_dt = reset_dt + elif prev.get("cooldown_until"): + try: + cd_dt = datetime.fromisoformat(prev["cooldown_until"]) + except Exception: + cd_dt = None + cooldown_until = cd_dt.isoformat() if cd_dt else None + in_cooldown = bool(cd_dt and now < cd_dt) + weekly = bool(cd_dt and (cd_dt - now) > timedelta(hours=WEEKLY_GAP_HOURS)) + + # progress-based liveness (chunk checkpoints, NOT log mtime) + if ck > prev.get("checkpointed_chunks", ck): + last_progress_at = now + else: + lp = prev.get("last_progress_at") + try: + last_progress_at = datetime.fromisoformat(lp) if lp else now + except Exception: + last_progress_at = now + progress_age = (now - last_progress_at).total_seconds() + hung = (status == "online" and prev.get("last_progress_at") is not None + and progress_age > STUCK_SILENCE_SEC) + + # ── decide ONE action ── + action, detail, mode = "none", "", "running" + if pending == 0 and processing == 0: + mode = "done" + notes.append("התור ריק — אין מה לחלץ.") + elif in_cooldown: + mode = "weekly_exhausted" if weekly else "ratelimited" + action = "hold" + notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") + elif hung: + mode = "hung" + ok, detail = trigger_drain(*win) + action = "restart-hung" if ok else "restart-hung-failed" + notes.append(f"זוהתה תקיעה ({int(progress_age)}ש' ללא checkpoint חדש) — בוצע restart.") + elif status == "online": + mode = "running" + notes.append(f"רץ ({len(proc_cases)} בעיבוד, התקדמות לפני {int(progress_age)}ש').") + elif not window_open: + mode = "idle_off_window" + notes.append(f"מחוץ לחלון-הלילה ({NIGHT_START}:00–0{NIGHT_END}:00) — ממתין; {pending} בתור.") + else: + ok, detail = trigger_drain(*win) + action = "triggered" if ok else "trigger-failed" + notes.append(f"הדריינר היה בטל — הוצת ({phase})." if ok else f"הצתה נכשלה: {detail}") + + staging_ok = not (d_done > 0 and d_ck == 0) + if not staging_ok: + notes.append("⚠️ תיקים הושלמו אך checkpoints לא התקדמו — לבדוק staging.") + + if mode in ("ratelimited", "weekly_exhausted") and cd_dt: + next_wake = max(300, min(int((cd_dt - now).total_seconds()) + 180, 3600)) + elif mode in ("done", "idle_off_window"): + next_wake = 1800 + else: + next_wake = 900 + + save_state({ + "tick_at": now.isoformat(), "phase": phase, + "burst_until": burst_until.isoformat() if burst_until else None, + "halachot_total": hal, "checkpointed_chunks": ck, "done": done, + "pending": pending, "processing": processing, "processing_cases": proc_cases, + "last_progress_at": last_progress_at.isoformat(), + "cooldown_until": cooldown_until if in_cooldown else None, + "mode": mode, "action": action, + }) + + elapsed = (f" (מאז הקודם: +{d_hal} הלכות, +{d_ck} מקטעים, +{d_done} תיקים)" + if prev.get("tick_at") else "") + bu = f" burst→{burst_until.astimezone(IDT):%a %d/%m %H:%M}" if burst else "" + print(f"🕒 {now.astimezone(IDT):%H:%M:%S IDT} | מצב: {mode} ({phase}){bu} | פעולה: {action}") + print(f" תור: pending={pending} processing={processing} done={done} failed={failed}") + print(f" staging: halachot={hal} checkpointed_chunks={ck} pending_review={snap['pending_review']}{elapsed}") + print(f" process: pm2={status} progress_age={int(progress_age)}s staging_ok={staging_ok}") + if proc_cases: + print(f" בעיבוד עכשיו: {', '.join(proc_cases)}") + for n in notes: + print(f" • {n}") + print("JSON:" + json.dumps({ + "ok": True, "mode": mode, "phase": phase, "action": action, "next_wake_sec": next_wake, + "burst_until": burst_until.isoformat() if burst_until else None, + "pending": pending, "processing": processing, "done": done, "failed": failed, + "halachot_total": hal, "checkpointed_chunks": ck, + "delta_halachot": d_hal, "delta_checkpoints": d_ck, "delta_done": d_done, + "pm2": status, "progress_age_sec": int(progress_age), + "staging_ok": staging_ok, "rate_limited": in_cooldown, "cooldown_until": cooldown_until, + }, ensure_ascii=False)) + + +def cmd_status(): + bu = db_get_burst() + now = _now_utc() + burst = bool(bu and now < bu) + st = load_state() + print(f"מצב: {'BURST' if burst else 'night'}" + + (f" → עד {bu.astimezone(IDT):%a %d/%m %H:%M IDT}" if burst else " (חלון 23:00–05:00)")) + if st: + print(f"טיק אחרון: {st.get('tick_at','—')} | mode={st.get('mode')} action={st.get('action')}") + print(f"תור: pending={st.get('pending')} processing={st.get('processing')} " + f"done={st.get('done')} | staging halachot={st.get('halachot_total')}") + print(f"דריינר pm2: {pm2_status()}") + + +def main(): + ap = argparse.ArgumentParser(description="halacha-drain supervisor") + ap.add_argument("cmd", nargs="?", default="tick", + choices=["tick", "burst-on", "burst-off", "status"]) + ap.add_argument("--until", help="burst deadline override, ISO or 'YYYY-MM-DD HH:MM' (IDT)") + a = ap.parse_args() + + if a.cmd == "burst-on": + if a.until: + try: + until = datetime.fromisoformat(a.until) + except ValueError: + until = datetime.strptime(a.until, "%Y-%m-%d %H:%M") + if until.tzinfo is None: + until = until.replace(tzinfo=IDT) + else: + until = next_saturday_18_utc() + db_set_burst(until.isoformat()) + print(f"✅ BURST הופעל — רץ עד {until.astimezone(IDT):%A %d/%m %H:%M IDT} (או burst-off).") + tick() # kick off immediately + elif a.cmd == "burst-off": + db_set_burst(None) + stopped = stop_drain() + print(f"🛑 BURST כובה. הדריינר {'נעצר' if stopped else 'לא נעצר (כבר עצור)'}; " + f"חזרה למצב לילה (23:00–05:00).") + tick() + elif a.cmd == "status": + cmd_status() + else: + tick() + + +if __name__ == "__main__": + main() diff --git a/scripts/legal-halacha-supervisor.config.cjs b/scripts/legal-halacha-supervisor.config.cjs new file mode 100644 index 0000000..1fb7109 --- /dev/null +++ b/scripts/legal-halacha-supervisor.config.cjs @@ -0,0 +1,45 @@ +/** + * pm2 ecosystem entry for legal-halacha-supervisor — the permanent health + * manager for the halacha-extraction drain (legal-halacha-drain). + * + * Runs ONE health tick per cron fire (~every 15 min) and takes ZERO Claude quota + * itself (only the drain calls Opus); it just reads the DB/logs/pm2 and pokes the + * existing drain via the established run-now mechanism. Each tick: + * • re-triggers the one-shot drain when idle and the queue is non-empty + * • restarts a HUNG run (online but no new chunk-checkpoint > 25 min — the + * out-log only updates when a whole CASE finishes, so mtime is not liveness) + * • backs off on rate-limit (claude_session 429) until the CLI's parsed reset + * • verifies crash-safe per-chunk staging is committing (nothing lost) + * + * BURST (manual "run continuously now" window): source of truth is + * drain_controls.burst_until in the DB — the SAME value the /operations page + * reads/writes (G1 single source; G2 no parallel control path). While it is in + * the future the supervisor LIFTS the drain's night-window; otherwise the drain + * keeps its normal 23:00–05:00 IDT window. Burst auto-expires at its deadline. + * Manual front-ends: the /operations toggle, or: + * mcp-server/.venv/bin/python scripts/halacha_drain_supervisor.py burst-on + * mcp-server/.venv/bin/python scripts/halacha_drain_supervisor.py burst-off + * + * Pattern (mirrors legal-halacha-drain): cron_restart fires the script; + * autorestart:false → one-shot per tick (pm2 shows "stopped" between ticks). + * + * Install (once): + * pm2 start /home/chaim/legal-ai/scripts/legal-halacha-supervisor.config.cjs + * pm2 save + */ +const cron = process.env.HALACHA_SUPERVISOR_CRON || "*/15 * * * *"; + +module.exports = { + apps: [ + { + name: "legal-halacha-supervisor", + cwd: "/home/chaim/legal-ai", + script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python", + args: "scripts/halacha_drain_supervisor.py", + env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" }, + autorestart: false, // one-shot per cron tick + cron_restart: cron, + max_memory_restart: "300M", + }, + ], +}; diff --git a/web/app.py b/web/app.py index 4d8153f..f80660d 100644 --- a/web/app.py +++ b/web/app.py @@ -6637,8 +6637,10 @@ async def operations_snapshot(): pm2 = await _ops_pm2_services() controls = await db.get_drain_controls() + bursts = await db.get_drain_bursts() for svc in pm2["services"]: svc["disabled"] = controls.get(svc.get("name", ""), False) + svc["burst_until"] = bursts.get(svc.get("name", "")) def _iso(rows: list[dict]) -> list[dict]: for d in rows: @@ -6717,6 +6719,53 @@ async def operations_drain_toggle(name: str, body: dict = Body(...)): return {"ok": True, "name": name, "disabled": disabled} +def _next_saturday_18_il() -> datetime: + """Upcoming Saturday 18:00 Israel time (DST-safe).""" + from datetime import timedelta + from zoneinfo import ZoneInfo + il = ZoneInfo("Asia/Jerusalem") + now = datetime.now(il) + days = (5 - now.weekday()) % 7 # Mon=0 .. Sat=5 .. Sun=6 + cand = now.replace(hour=18, minute=0, second=0, microsecond=0) + timedelta(days=days) + if cand <= now: + cand += timedelta(days=7) + return cand + + +@app.post("/api/operations/drains/{name}/burst") +async def operations_drain_burst(name: str, body: dict = Body(...)): + """Start/stop a drain's MANUAL burst window (chair-controlled, from /operations). + + ``action='on'`` → ``burst_until`` = body ``until`` (ISO) or the upcoming + Saturday 18:00 Israel time. ``action='off'`` → NULL. The host supervisor + (legal-halacha-supervisor) reads this from the DB and lifts/restores the + drain's window accordingly (takes effect within one supervisor tick, ≤15 min). + Never set automatically — manual only.""" + if not name.startswith("legal-"): + raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*") + action = (body.get("action") or "").lower() + if action == "off": + await db.set_drain_burst(name, None) + return {"ok": True, "name": name, "burst_until": None} + if action == "on": + until = body.get("until") + if until: + try: + until_dt = datetime.fromisoformat(until) + except (ValueError, TypeError): + raise HTTPException(400, "until חייב להיות ISO-8601") + if until_dt.tzinfo is None: + from zoneinfo import ZoneInfo + until_dt = until_dt.replace(tzinfo=ZoneInfo("Asia/Jerusalem")) + else: + until_dt = _next_saturday_18_il() + if until_dt <= datetime.now(timezone.utc): + raise HTTPException(400, "until חייב להיות בעתיד") + await db.set_drain_burst(name, until_dt) + return {"ok": True, "name": name, "burst_until": until_dt.isoformat()} + raise HTTPException(400, "action חייב להיות on|off") + + # ── Live agents (/operations "סוכנים פעילים") ────────────────────────────── # What the pm2/queue panels can't show: WHICH agent is doing the work right now # and its live output. An agent-driven drain (e.g. the CEO heartbeat draining -- 2.49.1