Files
legal-ai/scripts/drain_halacha_queue.py
Chaim 4fa62db192
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
feat(halacha): drain לילי (23:00–05:00) + per-upload חילוץ תיק-בודד דרך ה-CEO (#120)
מפריד בין ריקון-באקלוג המוני לבין חילוץ per-upload, ומסיר את ה"פקק" שגרם
timeout/process_lost ב-heartbeat של ה-CEO.

הבעיה (אבחנה 2026-06-11): לחיצת "חלץ הלכות" על תיק בודד יצרה issue (CMP-165)
שהורה ל-CEO להריץ precedent_process_pending(halacha) — בולען סינכרוני שמרוקן את
כל התור ההיסטורי (147 ממתינים, שעות) בתוך heartbeat שחסום לשעה. תוצאה: timeout
כל שעה → process_lost בפירוק קבוצת-התהליכים → retry → סטורם, והתיק הבודד (FIFO
אחרון) לא טופל. לא OOM, לא קוד שבור — אי-התאמה ארכיטקטונית.

התיקון:
1. per-upload (web/paperclip_client.py, wake_for_precedent_extraction): גוף ה-issue
   + תיאור-הפרויקט מורים כעת להריץ precedent_extract_metadata +
   precedent_extract_halachot ל-case_law_id של ה-issue **בלבד** — עם אזהרה
   מפורשת לא להריץ process_pending. reextract_halachot כבר מנקה requested_at
   ומסמן completed → התיק לא יחזור לתור הלילי.
2. הוראות ה-CEO (.claude/agents/legal-ceo.md): אותו שינוי — חילוץ תיק-בודד, לא
   ריקון-תור. (צריך sync_agents_across_companies.py --apply אחרי מיזוג.)
3. ריקון-באקלוג (scripts/drain_halacha_queue.py): שער חלון-לילה 23:00–05:00 שעון
   ישראל (zoneinfo, DST-safe — המכונה UTC). מחוץ לחלון ===SKIP===; נעצר ===STOP===
   כשהחלון נסגר, השאר ממשיך בלילה הבא (FIFO + per-chunk checkpoint). env:
   HALACHA_DRAIN_WINDOW_START/_END/_TZ.
4. cron (scripts/legal-halacha-drain.config.cjs): UTC band 20:00–03:00 שמכסה את
   חלון-ישראל בשני מצבי-DST; הסקריפט גוזם לחלון המדויק. ירייה שעתית מחדשת
   one-shot שמת (advisory-lock → חפיפה בטוחה).

רשת-ביטחון: request_halacha_extraction עדיין מסמן requested_at, כך שאם wakeup
ל-CEO נכשל — הדריינר הלילי יתפוס את התיק (בלילה, חסום), אך שום נתיב יומי לא
מרוקן את כל התור.

Invariants: מקיים G12/INV-PORT1 (paperclip_client = shell; leak_guard עובר).
נוגע X16 (durability — מתקציב-זמן heartbeat ל-job ייעודי).

בדיקות: py_compile ✓ · window-logic + zoneinfo ✓ (17:00 IDT→False) · leak_guard ✓.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 14:02:38 +00:00

87 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') in small batches
until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
NIGHT-WINDOW: halacha extraction is slow (Opus, ~10 min/case) and token-heavy, so
the backlog drain runs ONLY in an off-hours window (default 23:0005:00 Israel
time) — it never competes with daytime interactive work or other agents. A tick
that starts at 23:00 keeps going until the queue empties OR the window closes
(checked before every round); whatever's left resumes the next night (FIFO +
per-chunk checkpoint → no lost or duplicated work). Single-case extraction
requested by the chair goes through the CEO immediately and is NOT gated here.
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
"""
import asyncio
import os
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
_WINDOW_END = int(os.environ.get("HALACHA_DRAIN_WINDOW_END", "5"))
def _in_window() -> bool:
"""True iff the current Israel-time hour is inside [START, END).
Handles the midnight wrap (e.g. 23→5): the window is the union of
[START,24) and [0,END). If START == END the window is treated as 'always'.
"""
if _WINDOW_START == _WINDOW_END:
return True
hour = datetime.now(_TZ).hour
if _WINDOW_START < _WINDOW_END: # same-day window, e.g. 1→5
return _WINDOW_START <= hour < _WINDOW_END
return hour >= _WINDOW_START or hour < _WINDOW_END # wraps midnight, e.g. 23→5
async def main():
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-halacha-drain"):
print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True)
return
if not _in_window():
now = datetime.now(_TZ).strftime("%H:%M %Z")
print(f"===SKIP=== outside drain window {_WINDOW_START:02d}:00"
f"{_WINDOW_END:02d}:00 (now {now})", flush=True)
return
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
if not _in_window():
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
f"{total} cases this run; rest resumes next night", flush=True)
break
rnd += 1
out = await pl.process_pending_extractions(kind="halacha", limit=4)
processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(5)
else:
empty_rounds = 0
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
if __name__ == "__main__":
asyncio.run(main())