feat(halacha): עצירה-רכה של הדריינר בסף-ניצול (75/65) + מקור-אמת יחיד למכסה (#265)
Co-authored-by: Chaim <chaim@marcus-law.co.il> Co-committed-by: Chaim <chaim@marcus-law.co.il>
This commit was merged in pull request #265.
This commit is contained in:
103
mcp-server/src/legal_mcp/services/usage_limits.py
Normal file
103
mcp-server/src/legal_mcp/services/usage_limits.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""claude.ai subscription-usage ceilings — the single source of truth.
|
||||
|
||||
ONE place that reads the (undocumented) OAuth usage endpoint and decides whether
|
||||
a usage window has crossed its soft stop-before-429 ceiling. Imported by BOTH the
|
||||
halacha drain (`scripts/drain_halacha_queue.py`) and its supervisor
|
||||
(`scripts/halacha_drain_supervisor.py`) so the two never drift (G1/G2).
|
||||
|
||||
STRICTLY stdlib — no asyncpg / aiohttp / config imports. The supervisor runs as
|
||||
plain system ``python3`` and imports this module directly; pulling in heavy deps
|
||||
here would break that import. (``legal_mcp/__init__`` and ``services/__init__``
|
||||
are intentionally empty, which is what makes the system-python import work.)
|
||||
|
||||
Soft ceilings (chair, 2026-06-15): stop the drain BEFORE a window exhausts so the
|
||||
in-flight case finishes on the remaining quota and the drain idles until reset,
|
||||
instead of hammering 429 (which burns retries and leaves cases half-extracted).
|
||||
5-hour ("hourly session") window stops at 75%, the weekly windows at 65%.
|
||||
Overridable via env for ops tuning without a redeploy.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# claude.ai subscription usage. The token lives in the CLI's own credentials
|
||||
# file; the claude-code User-Agent is REQUIRED — without it the request lands in
|
||||
# an aggressively rate-limited bucket and 429s. Unofficial endpoint: may change,
|
||||
# so every caller must tolerate a None return and fall back.
|
||||
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
|
||||
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
|
||||
USAGE_UA = "claude-code/2.1.177"
|
||||
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
try:
|
||||
return int(os.environ.get(name, default))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
# Reaching a ceiling is treated EXACTLY like 100% exhaustion (cooldown until that
|
||||
# window's resets_at). Both weekly keys share one threshold; the per-model cap
|
||||
# that's actually populated on this account is Sonnet (seven_day_opus is null) and
|
||||
# the all-models seven_day cap is the backstop for Opus usage either way.
|
||||
CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75)
|
||||
CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65)
|
||||
USAGE_CEILINGS = {
|
||||
"five_hour": CEILING_FIVE_HOUR,
|
||||
"seven_day": CEILING_WEEKLY,
|
||||
"seven_day_sonnet": CEILING_WEEKLY,
|
||||
}
|
||||
|
||||
|
||||
def subscription_usage() -> dict | None:
|
||||
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
|
||||
utilization the Claude Code UI shows — from the OAuth usage endpoint.
|
||||
|
||||
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
|
||||
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
|
||||
or None on ANY failure. Undocumented endpoint — every caller must tolerate
|
||||
None and fall back."""
|
||||
try:
|
||||
with open(CLAUDE_CRED_PATH) as f:
|
||||
token = json.load(f)["claudeAiOauth"]["accessToken"]
|
||||
except Exception:
|
||||
return None
|
||||
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"User-Agent": USAGE_UA, # required — else aggressive 429
|
||||
"anthropic-beta": "oauth-2025-04-20",
|
||||
})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def ceiling_status(usage: dict) -> tuple[bool, datetime | None, str]:
|
||||
"""Evaluate an already-fetched usage dict against USAGE_CEILINGS.
|
||||
|
||||
Returns (over, earliest_reset_utc, detail):
|
||||
• over — True iff ANY gated window is at/above its ceiling
|
||||
• earliest_reset — soonest resets_at among the windows that are over (UTC),
|
||||
or None
|
||||
• detail — short log string, e.g. "5h=78%/75 weekly=40%/65"
|
||||
|
||||
Takes the usage dict as a parameter (does NOT fetch) so the caller owns the
|
||||
single network read. null utilization → treated as 0% (window inactive)."""
|
||||
over, resets, parts = False, [], []
|
||||
label = {"five_hour": "5h", "seven_day": "weekly", "seven_day_sonnet": "weekly-sonnet"}
|
||||
for w, ceiling in USAGE_CEILINGS.items():
|
||||
info = usage.get(w) or {}
|
||||
util = info.get("utilization") or 0
|
||||
parts.append(f"{label.get(w, w)}={util:.0f}%/{ceiling}")
|
||||
if util >= ceiling:
|
||||
over = True
|
||||
r = info.get("resets_at")
|
||||
if r:
|
||||
try:
|
||||
resets.append(datetime.fromisoformat(r).astimezone(timezone.utc))
|
||||
except Exception:
|
||||
pass
|
||||
return over, (min(resets) if resets else None), " ".join(parts)
|
||||
@@ -114,9 +114,9 @@
|
||||
| `run_curator_deepseek_test_v2.sh` | A/B test #2 (2026-05-05) — אותו run אבל עם interaction. תוצאה: 9:08 דק׳, 5 ממצאים, היחיד מ-4 הריצות שזיהה תוצאה עובדתית נכונה (קבלה חלקית). interaction נכשל ב-API ("Agent run id required" בריצה ידנית). | החלפת Curator לאדפטר DeepSeek מקומי |
|
||||
| `run_curator_sonnet_rerun.sh` | A/B test #3 (2026-05-05) — ריצה חוזרת של Sonnet 4.5 על אותו CMP-78. תוצאה: 12:52 דק׳ (לעומת 20:13 בריצה המקורית — כי בלי לולאת interaction.json). זיהה תוצאה שגויה ("דחייה") **בעקביות עם הריצה המקורית** — Sonnet עקבי-בטעות, DeepSeek אקראי. | בדיקה חד-פעמית — לא להריץ שוב |
|
||||
| `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) |
|
||||
| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני |
|
||||
| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) **תיק-בודד לסבב (`limit=1`)** עד שהתור ריק (2 סבבים ריקים), עם קצב `pl.INTER_PRECEDENT_COOLDOWN_SEC` בין תיקים (נשמר אחרי המעבר ל-limit=1 שביטל את המרווח הפנימי-לסבב). **רץ רק בחלון-לילה 23:00–05:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **שער-סף-ניצול (נוהל-יו"ר):** בין כל תיק בודק את ניצול-claude.ai דרך מקור-האמת המשותף `legal_mcp.services.usage_limits` (`USAGE_CEILINGS` — אותו שה-supervisor קורא) ועוצר `===STOP=== usage ceiling reached` כשנחצה סף-רך (5-שעות ≥75% / שבועי ≥65%, env `HALACHA_DRAIN_CEILING_5H`/`_WEEKLY`). הבדיקה **בין** תיקים → התיק שרץ תמיד מסתיים על המכסה שנותרה ופשוט לא מתחיל חדש — עצירה **לפני** 429, לא קטילה. fail-OPEN אם ה-endpoint לא-זמין (ה-supervisor מגבה בקטילת-429). **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני |
|
||||
| `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `10 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:00–05:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). דקת-הצתה `:10` (לא `:00`) כדי לא לחלוק דקה עם metadata-drain (`:00`) או supervisor (`:05`) — מונע deadlock של DDL-המיגרציה כששני דריינים עולים יחד. `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (קורא `subscription_usage()` — endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** בזמן מוגבל **עוצר את הדריינר** (`hold-stopped`) כדי לא להלום 429, ומצית-מחדש כשהמכסה חוזרת (כל החלונות חזרו מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:00–07:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations |
|
||||
| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (wrapper דק מעל מקור-האמת המשותף `legal_mcp.services.usage_limits` — `subscription_usage()`+`ceiling_status()`+`USAGE_CEILINGS`, אותו שהדריינר קורא, G1/G2 בלי כפילות; endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** **שתי תגובות ל-cooldown:** בסף-רך (≥75/65, **בלי** 429 טרי) — `hold-soft`: **אינו קוטל**, הדריינר קורא את אותו סף ועוצר לבד בין תיקים אחרי שמסיים את התיק הנוכחי (יש מכסה לסיים); קטילה (`hold-stopped`, `pm2 stop`) רק ב-**429 טרי אמיתי** (`log_rl` — הקריאות באמת נכשלות). מצית-מחדש כשהמכסה חוזרת (כל החלונות מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:00–07:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations |
|
||||
| `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `5-59/15 * * * *` = `:05,:20,:35,:50`, `HALACHA_SUPERVISOR_CRON` לעקיפה; דקת-הצתה `:05` כדי לא לחלוק דקה עם metadata-drain `:00` או halacha-drain `:10` — מונע deadlock של DDL-המיגרציה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) |
|
||||
| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` |
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Drain the halacha extraction queue for the incoming batch.
|
||||
|
||||
Calls the canonical process_pending_extractions(kind='halacha') in small batches
|
||||
Calls the canonical process_pending_extractions(kind='halacha') ONE case per round
|
||||
until the queue is empty (two consecutive zero-progress rounds). Serial + global
|
||||
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
|
||||
|
||||
@@ -14,6 +14,14 @@ requested by the chair goes through the CEO immediately and is NOT gated here.
|
||||
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
|
||||
HALACHA_DRAIN_WINDOW_START / _END (hours, 0–23) · HALACHA_DRAIN_TZ.
|
||||
|
||||
USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops
|
||||
GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%,
|
||||
USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor
|
||||
reads). Because it's checked between cases, the in-flight case always finishes on
|
||||
the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429
|
||||
rather than hammering it. Resumes when the window resets. Env overrides:
|
||||
HALACHA_DRAIN_CEILING_5H / _WEEKLY.
|
||||
|
||||
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
|
||||
"""
|
||||
|
||||
@@ -27,6 +35,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "
|
||||
|
||||
from legal_mcp.services import db
|
||||
from legal_mcp.services import precedent_library as pl
|
||||
from legal_mcp.services import usage_limits
|
||||
|
||||
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
|
||||
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
|
||||
@@ -74,8 +83,26 @@ async def main():
|
||||
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
|
||||
f"{total} cases this run; rest resumes next night", flush=True)
|
||||
break
|
||||
# Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked
|
||||
# BETWEEN cases (limit=1 below), so the in-flight case always finishes on
|
||||
# the remaining quota and we just don't start a new one — stopping BEFORE
|
||||
# a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads
|
||||
# (single source of truth: legal_mcp.services.usage_limits).
|
||||
# FAIL-OPEN on endpoint failure (returns None): keep draining — the
|
||||
# supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to
|
||||
# fail-closed; a throttled endpoint must not halt the night's drain.
|
||||
usage = await asyncio.get_event_loop().run_in_executor(
|
||||
None, usage_limits.subscription_usage)
|
||||
if usage is not None:
|
||||
over, _, detail = usage_limits.ceiling_status(usage)
|
||||
if over:
|
||||
print(f"===STOP=== usage ceiling reached ({detail}) — finished "
|
||||
f"{total} case(s); resumes when quota resets", flush=True)
|
||||
break
|
||||
rnd += 1
|
||||
out = await pl.process_pending_extractions(kind="halacha", limit=4)
|
||||
# limit=1 → the ceiling (and window/disabled) are re-checked between every
|
||||
# case, so a soft stop never abandons an in-flight extraction.
|
||||
out = await pl.process_pending_extractions(kind="halacha", limit=1)
|
||||
processed = out.get("processed", 0)
|
||||
total_pending = out.get("total_pending", 0)
|
||||
total += processed
|
||||
@@ -87,6 +114,11 @@ async def main():
|
||||
await asyncio.sleep(5)
|
||||
else:
|
||||
empty_rounds = 0
|
||||
# Preserve the inter-case spacer that process_pending_extractions
|
||||
# applies WITHIN a batch (only when idx>0) — with limit=1 every batch
|
||||
# is one case, so it never fires there; pace here instead to avoid
|
||||
# 429 bursts. Single source: the service's own cooldown constant.
|
||||
await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC)
|
||||
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
|
||||
|
||||
|
||||
|
||||
@@ -49,19 +49,12 @@ import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from glob import glob
|
||||
|
||||
REPO = "/home/chaim/legal-ai"
|
||||
# claude.ai subscription usage — the same 5-hour / 7-day utilization the Claude
|
||||
# Code status bar shows, via the (undocumented) OAuth usage endpoint. The token
|
||||
# lives in the CLI's own credentials file; the claude-code User-Agent is
|
||||
# REQUIRED — without it the request lands in an aggressively rate-limited bucket
|
||||
# and 429s. Unofficial endpoint: may change, so callers must tolerate None.
|
||||
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
|
||||
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
|
||||
_USAGE_UA = "claude-code/2.1.177"
|
||||
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
|
||||
STATE = os.path.join(RUNTIME_DIR, "state.json")
|
||||
DRAIN = "legal-halacha-drain"
|
||||
@@ -77,29 +70,17 @@ NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
|
||||
CATCHUP_END = 7 # soft window end (IDT) for early-morning catch-up — see fix B
|
||||
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
try:
|
||||
return int(os.environ.get(name, default))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
# Soft utilization ceilings — stop the drain BEFORE a window actually exhausts
|
||||
# (429s). Hitting a 429 mid-case forces re-extraction of an already-completed
|
||||
# case under the rate limit, DEGRADING it; stopping at the chair's ceilings instead
|
||||
# lets the in-flight halacha case finish cleanly and the drain idle until the
|
||||
# window resets. Reaching a ceiling is treated EXACTLY like 100% exhaustion
|
||||
# (cooldown until that window's resets_at). Per the chair (2026-06-15): the 5-hour
|
||||
# ("hourly session") window stops at 75%, the weekly windows at 65%. Both keys map
|
||||
# to the same windows quota_available / quota_exhausted gate on; overridable via
|
||||
# env for ops tuning without a redeploy.
|
||||
CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75)
|
||||
CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65)
|
||||
USAGE_CEILINGS = {
|
||||
"five_hour": CEILING_FIVE_HOUR,
|
||||
"seven_day": CEILING_WEEKLY,
|
||||
"seven_day_sonnet": CEILING_WEEKLY,
|
||||
}
|
||||
# Usage-ceiling logic (subscription_usage / ceiling_status / USAGE_CEILINGS) is
|
||||
# the SINGLE source of truth, shared with the drain — see
|
||||
# legal_mcp/services/usage_limits.py. The supervisor runs as system python3, so
|
||||
# put the (stdlib-only) package on the path before importing it. Resolve relative
|
||||
# to THIS file (same as drain_halacha_queue.py) so the module is loaded from the
|
||||
# same checkout the script lives in.
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
||||
"..", "mcp-server", "src"))
|
||||
from legal_mcp.services.usage_limits import ( # noqa: E402
|
||||
USAGE_CEILINGS, ceiling_status, subscription_usage,
|
||||
)
|
||||
|
||||
|
||||
def _now_utc():
|
||||
@@ -135,31 +116,6 @@ CLAUDE = claude_bin()
|
||||
_ENV = {**os.environ, "HOME": "/home/chaim"}
|
||||
|
||||
|
||||
def subscription_usage() -> dict | None:
|
||||
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
|
||||
utilization the Claude Code UI shows — from the OAuth usage endpoint.
|
||||
|
||||
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
|
||||
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
|
||||
or None on ANY failure. Undocumented endpoint — every caller must tolerate
|
||||
None and fall back."""
|
||||
try:
|
||||
with open(CLAUDE_CRED_PATH) as f:
|
||||
token = json.load(f)["claudeAiOauth"]["accessToken"]
|
||||
except Exception:
|
||||
return None
|
||||
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"User-Agent": _USAGE_UA, # required — else aggressive 429
|
||||
"anthropic-beta": "oauth-2025-04-20",
|
||||
})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def quota_available() -> bool:
|
||||
"""Is the claude.ai quota actually usable right now?
|
||||
|
||||
@@ -176,14 +132,10 @@ def quota_available() -> bool:
|
||||
timeout, or limit message → treat as still limited."""
|
||||
usage = subscription_usage()
|
||||
if usage is not None:
|
||||
# A drain run needs the 5-hour window, the weekly all-models cap, AND
|
||||
# the weekly per-model cap all below their ceilings. On this account the
|
||||
# per-model cap that's actually populated is Sonnet (seven_day_opus is
|
||||
# null — no separate Opus cap); the all-models seven_day cap is the
|
||||
# backstop for Opus usage either way. null utilization → treated as 0%.
|
||||
utils = {w: (usage.get(w) or {}).get("utilization") for w in USAGE_CEILINGS}
|
||||
# utilization may be None (window inactive / no data) → treat as 0%.
|
||||
return all((u or 0) < USAGE_CEILINGS[w] for w, u in utils.items())
|
||||
# All gated windows (5-hour, weekly all-models, weekly-Sonnet) must be
|
||||
# below their ceilings — same evaluation the drain uses (ceiling_status).
|
||||
over, _, _ = ceiling_status(usage)
|
||||
return not over
|
||||
# ── fallback: official-CLI probe ──
|
||||
try:
|
||||
r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"],
|
||||
@@ -210,23 +162,13 @@ def quota_exhausted():
|
||||
Returns (exhausted: bool, earliest_reset_utc: datetime|None), or None when the
|
||||
endpoint is unreachable (caller falls back to the log scrape). A window counts
|
||||
as exhausting the drain at >= its USAGE_CEILINGS ceiling (the chair's soft
|
||||
stop-before-429 thresholds) — same windows quota_available gates on (5-hour,
|
||||
weekly all-models, weekly-Sonnet)."""
|
||||
stop-before-429 thresholds) — the SAME evaluation the drain gates on
|
||||
(ceiling_status)."""
|
||||
usage = subscription_usage()
|
||||
if usage is None:
|
||||
return None
|
||||
exhausted, resets = False, []
|
||||
for w, ceiling in USAGE_CEILINGS.items():
|
||||
info = usage.get(w) or {}
|
||||
if (info.get("utilization") or 0) >= ceiling:
|
||||
exhausted = True
|
||||
r = info.get("resets_at")
|
||||
if r:
|
||||
try:
|
||||
resets.append(datetime.fromisoformat(r).astimezone(timezone.utc))
|
||||
except Exception:
|
||||
pass
|
||||
return exhausted, (min(resets) if resets else None)
|
||||
over, reset, _ = ceiling_status(usage)
|
||||
return over, reset
|
||||
|
||||
|
||||
# ── DB access (via the repo venv; the module self-configures) ────────────────
|
||||
@@ -544,15 +486,24 @@ def tick():
|
||||
notes.append("התור ריק — אין מה לחלץ.")
|
||||
elif in_cooldown:
|
||||
mode = "weekly_exhausted" if weekly else "ratelimited"
|
||||
# Stop a running drain while limited — otherwise it keeps spawning Opus
|
||||
# calls that 429 on every chunk, burning the very quota we're waiting on
|
||||
# (and burying the 429 signal under teardown noise). It re-ignites via the
|
||||
# normal trigger path once cooldown clears.
|
||||
if status == "online":
|
||||
# Two cooldown causes, two responses:
|
||||
# • FRESH real 429 (log_rl) — the drain is literally failing on every
|
||||
# chunk, burning the quota we're waiting on and re-extracting/degrading
|
||||
# completed cases. HARD-KILL it (pm2 stop); re-ignites once cooldown
|
||||
# clears.
|
||||
# • SOFT ceiling (≥75%/65% with no fresh 429) — there's still quota
|
||||
# headroom, so DON'T kill: the drain reads the SAME ceiling and stops
|
||||
# itself between cases, finishing the in-flight case cleanly. We just
|
||||
# hold (never re-trigger while in_cooldown).
|
||||
if status == "online" and log_rl:
|
||||
stop_drain()
|
||||
action = "hold-stopped"
|
||||
notes.append(f"rate-limit פעיל — הדריינר נעצר כדי לא לבזבז מכסה על 429; "
|
||||
notes.append(f"429 טרי — הדריינר נקטל כדי לא להלום rate-limit; "
|
||||
f"איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
|
||||
elif status == "online":
|
||||
action = "hold-soft"
|
||||
notes.append(f"סף-ניצול נחצה (אין 429 טרי) — הדריינר יסיים את התיק "
|
||||
f"הנוכחי ויעצור לבד; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
|
||||
else:
|
||||
action = "hold"
|
||||
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
|
||||
|
||||
Reference in New Issue
Block a user