From 4ac661b0a67e19bd16bb7046132144b7f74b3f9b Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 15 Jun 2026 04:11:09 +0000 Subject: [PATCH] =?UTF-8?q?feat(halacha):=20=D7=A2=D7=A6=D7=99=D7=A8=D7=94?= =?UTF-8?q?-=D7=A8=D7=9B=D7=94=20=D7=A9=D7=9C=20=D7=94=D7=93=D7=A8=D7=99?= =?UTF-8?q?=D7=99=D7=A0=D7=A8=20=D7=91=D7=A1=D7=A3-=D7=A0=D7=99=D7=A6?= =?UTF-8?q?=D7=95=D7=9C=20+=20=D7=9E=D7=A7=D7=95=D7=A8-=D7=90=D7=9E=D7=AA?= =?UTF-8?q?=20=D7=99=D7=97=D7=99=D7=93=20=D7=9C=D7=9E=D7=9B=D7=A1=D7=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit בהגיע סף-רך (5-שעות ≥75% / שבועי ≥65%) הדריינר מסיים את התיק שרץ ועוצר לבד בין תיקים (===STOP===) — במקום שהסופרוויזר יקטול אותו ב-pm2 stop באמצע חילוץ. ב-75% יש מכסה לסיים את התיק; הקטילה נשמרת רק ל-429 טרי אמיתי. - חדש legal_mcp/services/usage_limits.py (stdlib-only): מקור-אמת יחיד — subscription_usage / USAGE_CEILINGS / ceiling_status. מיובא גם מ-system-python (supervisor) וגם מה-venv (drain). __init__ ריקים → import בטוח מחוץ ל-venv. - supervisor: מייבא מהמודול (הסרת ההעתקים המקומיים, ~50 שורות פחות); quota_exhausted/quota_available הפכו wrappers דקים; ענף cooldown — קטילה (hold-stopped) רק אם log_rl (429 טרי), אחרת hold-soft בלי pm2 stop. - drain: limit=4→1 (בדיקת-סף בין כל תיק); שער-סף ב-run_in_executor, fail-OPEN כש-endpoint None (הסופרוויזר מגבה ב-429-kill); שמירת קצב 30ש' בין תיקים (pl.INTER_PRECEDENT_COOLDOWN_SEC — limit=1 ביטל את המרווח הפנימי-לסבב). - SCRIPTS.md עודכן (limit=1, שער-סף, hold-soft, מקור-אמת משותף). אומת end-to-end (endpoint חי): (1) drain עם סף מורד → ===STOP=== usage ceiling בלי לעבד תיק; (2) supervisor status=online+סף-רך → action=hold-soft, stop_drain נקרא 0; (3) 429 טרי → hold-stopped, stop_drain נקרא 1. py_compile עובר. court_fetch_service/usage_status (העתק שלישי, async/aiohttp, רגיש-דיפלוי) נדחה במכוון לאיחוד-עתידי — לא נוגעים בגשר-המארח כאן. Invariants: G1 (נרמול-במקור — endpoint יחיד), G2 (אין מסלול-בקרה מקביל — מודול משותף יחיד, drain+supervisor קוראים אותו דבר), X16 (עמידות — עצירה לפני 429 מונעת חילוץ-מחדש משחית). G12 לא רלוונטי. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/legal_mcp/services/usage_limits.py | 103 +++++++++++++++ scripts/SCRIPTS.md | 4 +- scripts/drain_halacha_queue.py | 36 +++++- scripts/halacha_drain_supervisor.py | 119 ++++++------------ 4 files changed, 174 insertions(+), 88 deletions(-) create mode 100644 mcp-server/src/legal_mcp/services/usage_limits.py diff --git a/mcp-server/src/legal_mcp/services/usage_limits.py b/mcp-server/src/legal_mcp/services/usage_limits.py new file mode 100644 index 0000000..49ded9c --- /dev/null +++ b/mcp-server/src/legal_mcp/services/usage_limits.py @@ -0,0 +1,103 @@ +"""claude.ai subscription-usage ceilings — the single source of truth. + +ONE place that reads the (undocumented) OAuth usage endpoint and decides whether +a usage window has crossed its soft stop-before-429 ceiling. Imported by BOTH the +halacha drain (`scripts/drain_halacha_queue.py`) and its supervisor +(`scripts/halacha_drain_supervisor.py`) so the two never drift (G1/G2). + +STRICTLY stdlib — no asyncpg / aiohttp / config imports. The supervisor runs as +plain system ``python3`` and imports this module directly; pulling in heavy deps +here would break that import. (``legal_mcp/__init__`` and ``services/__init__`` +are intentionally empty, which is what makes the system-python import work.) + +Soft ceilings (chair, 2026-06-15): stop the drain BEFORE a window exhausts so the +in-flight case finishes on the remaining quota and the drain idles until reset, +instead of hammering 429 (which burns retries and leaves cases half-extracted). +5-hour ("hourly session") window stops at 75%, the weekly windows at 65%. +Overridable via env for ops tuning without a redeploy. +""" +import json +import os +import urllib.request +from datetime import datetime, timezone + +# claude.ai subscription usage. The token lives in the CLI's own credentials +# file; the claude-code User-Agent is REQUIRED — without it the request lands in +# an aggressively rate-limited bucket and 429s. Unofficial endpoint: may change, +# so every caller must tolerate a None return and fall back. +CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json" +OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage" +USAGE_UA = "claude-code/2.1.177" + + +def _env_int(name: str, default: int) -> int: + try: + return int(os.environ.get(name, default)) + except (TypeError, ValueError): + return default + + +# Reaching a ceiling is treated EXACTLY like 100% exhaustion (cooldown until that +# window's resets_at). Both weekly keys share one threshold; the per-model cap +# that's actually populated on this account is Sonnet (seven_day_opus is null) and +# the all-models seven_day cap is the backstop for Opus usage either way. +CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75) +CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65) +USAGE_CEILINGS = { + "five_hour": CEILING_FIVE_HOUR, + "seven_day": CEILING_WEEKLY, + "seven_day_sonnet": CEILING_WEEKLY, +} + + +def subscription_usage() -> dict | None: + """Read the claude.ai subscription usage — the exact 5-hour / 7-day + utilization the Claude Code UI shows — from the OAuth usage endpoint. + + Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus, + seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at}) + or None on ANY failure. Undocumented endpoint — every caller must tolerate + None and fall back.""" + try: + with open(CLAUDE_CRED_PATH) as f: + token = json.load(f)["claudeAiOauth"]["accessToken"] + except Exception: + return None + req = urllib.request.Request(OAUTH_USAGE_URL, headers={ + "Authorization": f"Bearer {token}", + "User-Agent": USAGE_UA, # required — else aggressive 429 + "anthropic-beta": "oauth-2025-04-20", + }) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return json.loads(resp.read().decode("utf-8")) + except Exception: + return None + + +def ceiling_status(usage: dict) -> tuple[bool, datetime | None, str]: + """Evaluate an already-fetched usage dict against USAGE_CEILINGS. + + Returns (over, earliest_reset_utc, detail): + • over — True iff ANY gated window is at/above its ceiling + • earliest_reset — soonest resets_at among the windows that are over (UTC), + or None + • detail — short log string, e.g. "5h=78%/75 weekly=40%/65" + + Takes the usage dict as a parameter (does NOT fetch) so the caller owns the + single network read. null utilization → treated as 0% (window inactive).""" + over, resets, parts = False, [], [] + label = {"five_hour": "5h", "seven_day": "weekly", "seven_day_sonnet": "weekly-sonnet"} + for w, ceiling in USAGE_CEILINGS.items(): + info = usage.get(w) or {} + util = info.get("utilization") or 0 + parts.append(f"{label.get(w, w)}={util:.0f}%/{ceiling}") + if util >= ceiling: + over = True + r = info.get("resets_at") + if r: + try: + resets.append(datetime.fromisoformat(r).astimezone(timezone.utc)) + except Exception: + pass + return over, (min(resets) if resets else None), " ".join(parts) diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 1286694..162db54 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -114,9 +114,9 @@ | `run_curator_deepseek_test_v2.sh` | A/B test #2 (2026-05-05) — אותו run אבל עם interaction. תוצאה: 9:08 דק׳, 5 ממצאים, היחיד מ-4 הריצות שזיהה תוצאה עובדתית נכונה (קבלה חלקית). interaction נכשל ב-API ("Agent run id required" בריצה ידנית). | החלפת Curator לאדפטר DeepSeek מקומי | | `run_curator_sonnet_rerun.sh` | A/B test #3 (2026-05-05) — ריצה חוזרת של Sonnet 4.5 על אותו CMP-78. תוצאה: 12:52 דק׳ (לעומת 20:13 בריצה המקורית — כי בלי לולאת interaction.json). זיהה תוצאה שגויה ("דחייה") **בעקביות עם הריצה המקורית** — Sonnet עקבי-בטעות, DeepSeek אקראי. | בדיקה חד-פעמית — לא להריץ שוב | | `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) | -| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | +| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) **תיק-בודד לסבב (`limit=1`)** עד שהתור ריק (2 סבבים ריקים), עם קצב `pl.INTER_PRECEDENT_COOLDOWN_SEC` בין תיקים (נשמר אחרי המעבר ל-limit=1 שביטל את המרווח הפנימי-לסבב). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **שער-סף-ניצול (נוהל-יו"ר):** בין כל תיק בודק את ניצול-claude.ai דרך מקור-האמת המשותף `legal_mcp.services.usage_limits` (`USAGE_CEILINGS` — אותו שה-supervisor קורא) ועוצר `===STOP=== usage ceiling reached` כשנחצה סף-רך (5-שעות ≥75% / שבועי ≥65%, env `HALACHA_DRAIN_CEILING_5H`/`_WEEKLY`). הבדיקה **בין** תיקים → התיק שרץ תמיד מסתיים על המכסה שנותרה ופשוט לא מתחיל חדש — עצירה **לפני** 429, לא קטילה. fail-OPEN אם ה-endpoint לא-זמין (ה-supervisor מגבה בקטילת-429). **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | | `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `10 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:00–05:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). דקת-הצתה `:10` (לא `:00`) כדי לא לחלוק דקה עם metadata-drain (`:00`) או supervisor (`:05`) — מונע deadlock של DDL-המיגרציה כששני דריינים עולים יחד. `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | -| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (קורא `subscription_usage()` — endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** בזמן מוגבל **עוצר את הדריינר** (`hold-stopped`) כדי לא להלום 429, ומצית-מחדש כשהמכסה חוזרת (כל החלונות חזרו מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:00–07:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations | +| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (wrapper דק מעל מקור-האמת המשותף `legal_mcp.services.usage_limits` — `subscription_usage()`+`ceiling_status()`+`USAGE_CEILINGS`, אותו שהדריינר קורא, G1/G2 בלי כפילות; endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** **שתי תגובות ל-cooldown:** בסף-רך (≥75/65, **בלי** 429 טרי) — `hold-soft`: **אינו קוטל**, הדריינר קורא את אותו סף ועוצר לבד בין תיקים אחרי שמסיים את התיק הנוכחי (יש מכסה לסיים); קטילה (`hold-stopped`, `pm2 stop`) רק ב-**429 טרי אמיתי** (`log_rl` — הקריאות באמת נכשלות). מצית-מחדש כשהמכסה חוזרת (כל החלונות מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:00–07:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations | | `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `5-59/15 * * * *` = `:05,:20,:35,:50`, `HALACHA_SUPERVISOR_CRON` לעקיפה; דקת-הצתה `:05` כדי לא לחלוק דקה עם metadata-drain `:00` או halacha-drain `:10` — מונע deadlock של DDL-המיגרציה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | | `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | diff --git a/scripts/drain_halacha_queue.py b/scripts/drain_halacha_queue.py index a558e19..7294515 100644 --- a/scripts/drain_halacha_queue.py +++ b/scripts/drain_halacha_queue.py @@ -1,6 +1,6 @@ """Drain the halacha extraction queue for the incoming batch. -Calls the canonical process_pending_extractions(kind='halacha') in small batches +Calls the canonical process_pending_extractions(kind='halacha') ONE case per round until the queue is empty (two consecutive zero-progress rounds). Serial + global advisory-lock coordinated inside the service — avoids concurrent Claude load spikes. @@ -14,6 +14,14 @@ requested by the chair goes through the CEO immediately and is NOT gated here. Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides: HALACHA_DRAIN_WINDOW_START / _END (hours, 0–23) · HALACHA_DRAIN_TZ. +USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops +GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%, +USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor +reads). Because it's checked between cases, the in-flight case always finishes on +the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429 +rather than hammering it. Resumes when the window resets. Env overrides: +HALACHA_DRAIN_CEILING_5H / _WEEKLY. + Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py """ @@ -27,6 +35,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", " from legal_mcp.services import db from legal_mcp.services import precedent_library as pl +from legal_mcp.services import usage_limits _TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem")) _WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23")) @@ -74,8 +83,26 @@ async def main(): print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — " f"{total} cases this run; rest resumes next night", flush=True) break + # Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked + # BETWEEN cases (limit=1 below), so the in-flight case always finishes on + # the remaining quota and we just don't start a new one — stopping BEFORE + # a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads + # (single source of truth: legal_mcp.services.usage_limits). + # FAIL-OPEN on endpoint failure (returns None): keep draining — the + # supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to + # fail-closed; a throttled endpoint must not halt the night's drain. + usage = await asyncio.get_event_loop().run_in_executor( + None, usage_limits.subscription_usage) + if usage is not None: + over, _, detail = usage_limits.ceiling_status(usage) + if over: + print(f"===STOP=== usage ceiling reached ({detail}) — finished " + f"{total} case(s); resumes when quota resets", flush=True) + break rnd += 1 - out = await pl.process_pending_extractions(kind="halacha", limit=4) + # limit=1 → the ceiling (and window/disabled) are re-checked between every + # case, so a soft stop never abandons an in-flight extraction. + out = await pl.process_pending_extractions(kind="halacha", limit=1) processed = out.get("processed", 0) total_pending = out.get("total_pending", 0) total += processed @@ -87,6 +114,11 @@ async def main(): await asyncio.sleep(5) else: empty_rounds = 0 + # Preserve the inter-case spacer that process_pending_extractions + # applies WITHIN a batch (only when idx>0) — with limit=1 every batch + # is one case, so it never fires there; pace here instead to avoid + # 429 bursts. Single source: the service's own cooldown constant. + await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC) print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True) diff --git a/scripts/halacha_drain_supervisor.py b/scripts/halacha_drain_supervisor.py index a892652..f0ff4f1 100644 --- a/scripts/halacha_drain_supervisor.py +++ b/scripts/halacha_drain_supervisor.py @@ -49,19 +49,12 @@ import json import os import re import subprocess +import sys import urllib.request from datetime import datetime, timedelta, timezone from glob import glob REPO = "/home/chaim/legal-ai" -# claude.ai subscription usage — the same 5-hour / 7-day utilization the Claude -# Code status bar shows, via the (undocumented) OAuth usage endpoint. The token -# lives in the CLI's own credentials file; the claude-code User-Agent is -# REQUIRED — without it the request lands in an aggressively rate-limited bucket -# and 429s. Unofficial endpoint: may change, so callers must tolerate None. -CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json" -OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage" -_USAGE_UA = "claude-code/2.1.177" RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo) STATE = os.path.join(RUNTIME_DIR, "state.json") DRAIN = "legal-halacha-drain" @@ -77,29 +70,17 @@ NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours) CATCHUP_END = 7 # soft window end (IDT) for early-morning catch-up — see fix B -def _env_int(name: str, default: int) -> int: - try: - return int(os.environ.get(name, default)) - except (TypeError, ValueError): - return default - - -# Soft utilization ceilings — stop the drain BEFORE a window actually exhausts -# (429s). Hitting a 429 mid-case forces re-extraction of an already-completed -# case under the rate limit, DEGRADING it; stopping at the chair's ceilings instead -# lets the in-flight halacha case finish cleanly and the drain idle until the -# window resets. Reaching a ceiling is treated EXACTLY like 100% exhaustion -# (cooldown until that window's resets_at). Per the chair (2026-06-15): the 5-hour -# ("hourly session") window stops at 75%, the weekly windows at 65%. Both keys map -# to the same windows quota_available / quota_exhausted gate on; overridable via -# env for ops tuning without a redeploy. -CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75) -CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65) -USAGE_CEILINGS = { - "five_hour": CEILING_FIVE_HOUR, - "seven_day": CEILING_WEEKLY, - "seven_day_sonnet": CEILING_WEEKLY, -} +# Usage-ceiling logic (subscription_usage / ceiling_status / USAGE_CEILINGS) is +# the SINGLE source of truth, shared with the drain — see +# legal_mcp/services/usage_limits.py. The supervisor runs as system python3, so +# put the (stdlib-only) package on the path before importing it. Resolve relative +# to THIS file (same as drain_halacha_queue.py) so the module is loaded from the +# same checkout the script lives in. +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), + "..", "mcp-server", "src")) +from legal_mcp.services.usage_limits import ( # noqa: E402 + USAGE_CEILINGS, ceiling_status, subscription_usage, +) def _now_utc(): @@ -135,31 +116,6 @@ CLAUDE = claude_bin() _ENV = {**os.environ, "HOME": "/home/chaim"} -def subscription_usage() -> dict | None: - """Read the claude.ai subscription usage — the exact 5-hour / 7-day - utilization the Claude Code UI shows — from the OAuth usage endpoint. - - Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus, - seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at}) - or None on ANY failure. Undocumented endpoint — every caller must tolerate - None and fall back.""" - try: - with open(CLAUDE_CRED_PATH) as f: - token = json.load(f)["claudeAiOauth"]["accessToken"] - except Exception: - return None - req = urllib.request.Request(OAUTH_USAGE_URL, headers={ - "Authorization": f"Bearer {token}", - "User-Agent": _USAGE_UA, # required — else aggressive 429 - "anthropic-beta": "oauth-2025-04-20", - }) - try: - with urllib.request.urlopen(req, timeout=15) as resp: - return json.loads(resp.read().decode("utf-8")) - except Exception: - return None - - def quota_available() -> bool: """Is the claude.ai quota actually usable right now? @@ -176,14 +132,10 @@ def quota_available() -> bool: timeout, or limit message → treat as still limited.""" usage = subscription_usage() if usage is not None: - # A drain run needs the 5-hour window, the weekly all-models cap, AND - # the weekly per-model cap all below their ceilings. On this account the - # per-model cap that's actually populated is Sonnet (seven_day_opus is - # null — no separate Opus cap); the all-models seven_day cap is the - # backstop for Opus usage either way. null utilization → treated as 0%. - utils = {w: (usage.get(w) or {}).get("utilization") for w in USAGE_CEILINGS} - # utilization may be None (window inactive / no data) → treat as 0%. - return all((u or 0) < USAGE_CEILINGS[w] for w, u in utils.items()) + # All gated windows (5-hour, weekly all-models, weekly-Sonnet) must be + # below their ceilings — same evaluation the drain uses (ceiling_status). + over, _, _ = ceiling_status(usage) + return not over # ── fallback: official-CLI probe ── try: r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"], @@ -210,23 +162,13 @@ def quota_exhausted(): Returns (exhausted: bool, earliest_reset_utc: datetime|None), or None when the endpoint is unreachable (caller falls back to the log scrape). A window counts as exhausting the drain at >= its USAGE_CEILINGS ceiling (the chair's soft - stop-before-429 thresholds) — same windows quota_available gates on (5-hour, - weekly all-models, weekly-Sonnet).""" + stop-before-429 thresholds) — the SAME evaluation the drain gates on + (ceiling_status).""" usage = subscription_usage() if usage is None: return None - exhausted, resets = False, [] - for w, ceiling in USAGE_CEILINGS.items(): - info = usage.get(w) or {} - if (info.get("utilization") or 0) >= ceiling: - exhausted = True - r = info.get("resets_at") - if r: - try: - resets.append(datetime.fromisoformat(r).astimezone(timezone.utc)) - except Exception: - pass - return exhausted, (min(resets) if resets else None) + over, reset, _ = ceiling_status(usage) + return over, reset # ── DB access (via the repo venv; the module self-configures) ──────────────── @@ -544,15 +486,24 @@ def tick(): notes.append("התור ריק — אין מה לחלץ.") elif in_cooldown: mode = "weekly_exhausted" if weekly else "ratelimited" - # Stop a running drain while limited — otherwise it keeps spawning Opus - # calls that 429 on every chunk, burning the very quota we're waiting on - # (and burying the 429 signal under teardown noise). It re-ignites via the - # normal trigger path once cooldown clears. - if status == "online": + # Two cooldown causes, two responses: + # • FRESH real 429 (log_rl) — the drain is literally failing on every + # chunk, burning the quota we're waiting on and re-extracting/degrading + # completed cases. HARD-KILL it (pm2 stop); re-ignites once cooldown + # clears. + # • SOFT ceiling (≥75%/65% with no fresh 429) — there's still quota + # headroom, so DON'T kill: the drain reads the SAME ceiling and stops + # itself between cases, finishing the in-flight case cleanly. We just + # hold (never re-trigger while in_cooldown). + if status == "online" and log_rl: stop_drain() action = "hold-stopped" - notes.append(f"rate-limit פעיל — הדריינר נעצר כדי לא לבזבז מכסה על 429; " + notes.append(f"429 טרי — הדריינר נקטל כדי לא להלום rate-limit; " f"איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") + elif status == "online": + action = "hold-soft" + notes.append(f"סף-ניצול נחצה (אין 429 טרי) — הדריינר יסיים את התיק " + f"הנוכחי ויעצור לבד; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") else: action = "hold" notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")