feat(halacha): עצירה-רכה של הדריינר בסף-ניצול (75/65) + מקור-אמת יחיד למכסה #265

Merged
chaim merged 1 commits from worktree-halacha-soft-ceiling-stop into main 2026-06-15 04:11:44 +00:00
4 changed files with 174 additions and 88 deletions

View File

@@ -0,0 +1,103 @@
"""claude.ai subscription-usage ceilings — the single source of truth.
ONE place that reads the (undocumented) OAuth usage endpoint and decides whether
a usage window has crossed its soft stop-before-429 ceiling. Imported by BOTH the
halacha drain (`scripts/drain_halacha_queue.py`) and its supervisor
(`scripts/halacha_drain_supervisor.py`) so the two never drift (G1/G2).
STRICTLY stdlib — no asyncpg / aiohttp / config imports. The supervisor runs as
plain system ``python3`` and imports this module directly; pulling in heavy deps
here would break that import. (``legal_mcp/__init__`` and ``services/__init__``
are intentionally empty, which is what makes the system-python import work.)
Soft ceilings (chair, 2026-06-15): stop the drain BEFORE a window exhausts so the
in-flight case finishes on the remaining quota and the drain idles until reset,
instead of hammering 429 (which burns retries and leaves cases half-extracted).
5-hour ("hourly session") window stops at 75%, the weekly windows at 65%.
Overridable via env for ops tuning without a redeploy.
"""
import json
import os
import urllib.request
from datetime import datetime, timezone
# claude.ai subscription usage. The token lives in the CLI's own credentials
# file; the claude-code User-Agent is REQUIRED — without it the request lands in
# an aggressively rate-limited bucket and 429s. Unofficial endpoint: may change,
# so every caller must tolerate a None return and fall back.
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
USAGE_UA = "claude-code/2.1.177"
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, default))
except (TypeError, ValueError):
return default
# Reaching a ceiling is treated EXACTLY like 100% exhaustion (cooldown until that
# window's resets_at). Both weekly keys share one threshold; the per-model cap
# that's actually populated on this account is Sonnet (seven_day_opus is null) and
# the all-models seven_day cap is the backstop for Opus usage either way.
CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75)
CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65)
USAGE_CEILINGS = {
"five_hour": CEILING_FIVE_HOUR,
"seven_day": CEILING_WEEKLY,
"seven_day_sonnet": CEILING_WEEKLY,
}
def subscription_usage() -> dict | None:
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
utilization the Claude Code UI shows — from the OAuth usage endpoint.
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
or None on ANY failure. Undocumented endpoint — every caller must tolerate
None and fall back."""
try:
with open(CLAUDE_CRED_PATH) as f:
token = json.load(f)["claudeAiOauth"]["accessToken"]
except Exception:
return None
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
"Authorization": f"Bearer {token}",
"User-Agent": USAGE_UA, # required — else aggressive 429
"anthropic-beta": "oauth-2025-04-20",
})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
def ceiling_status(usage: dict) -> tuple[bool, datetime | None, str]:
"""Evaluate an already-fetched usage dict against USAGE_CEILINGS.
Returns (over, earliest_reset_utc, detail):
• over — True iff ANY gated window is at/above its ceiling
• earliest_reset — soonest resets_at among the windows that are over (UTC),
or None
• detail — short log string, e.g. "5h=78%/75 weekly=40%/65"
Takes the usage dict as a parameter (does NOT fetch) so the caller owns the
single network read. null utilization → treated as 0% (window inactive)."""
over, resets, parts = False, [], []
label = {"five_hour": "5h", "seven_day": "weekly", "seven_day_sonnet": "weekly-sonnet"}
for w, ceiling in USAGE_CEILINGS.items():
info = usage.get(w) or {}
util = info.get("utilization") or 0
parts.append(f"{label.get(w, w)}={util:.0f}%/{ceiling}")
if util >= ceiling:
over = True
r = info.get("resets_at")
if r:
try:
resets.append(datetime.fromisoformat(r).astimezone(timezone.utc))
except Exception:
pass
return over, (min(resets) if resets else None), " ".join(parts)

View File

@@ -114,9 +114,9 @@
| `run_curator_deepseek_test_v2.sh` | A/B test #2 (2026-05-05) — אותו run אבל עם interaction. תוצאה: 9:08 דק׳, 5 ממצאים, היחיד מ-4 הריצות שזיהה תוצאה עובדתית נכונה (קבלה חלקית). interaction נכשל ב-API ("Agent run id required" בריצה ידנית). | החלפת Curator לאדפטר DeepSeek מקומי | | `run_curator_deepseek_test_v2.sh` | A/B test #2 (2026-05-05) — אותו run אבל עם interaction. תוצאה: 9:08 דק׳, 5 ממצאים, היחיד מ-4 הריצות שזיהה תוצאה עובדתית נכונה (קבלה חלקית). interaction נכשל ב-API ("Agent run id required" בריצה ידנית). | החלפת Curator לאדפטר DeepSeek מקומי |
| `run_curator_sonnet_rerun.sh` | A/B test #3 (2026-05-05) — ריצה חוזרת של Sonnet 4.5 על אותו CMP-78. תוצאה: 12:52 דק׳ (לעומת 20:13 בריצה המקורית — כי בלי לולאת interaction.json). זיהה תוצאה שגויה ("דחייה") **בעקביות עם הריצה המקורית** — Sonnet עקבי-בטעות, DeepSeek אקראי. | בדיקה חד-פעמית — לא להריץ שוב | | `run_curator_sonnet_rerun.sh` | A/B test #3 (2026-05-05) — ריצה חוזרת של Sonnet 4.5 על אותו CMP-78. תוצאה: 12:52 דק׳ (לעומת 20:13 בריצה המקורית — כי בלי לולאת interaction.json). זיהה תוצאה שגויה ("דחייה") **בעקביות עם הריצה המקורית** — Sonnet עקבי-בטעות, DeepSeek אקראי. | בדיקה חד-פעמית — לא להריץ שוב |
| `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) | | `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) |
| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). **רץ רק בחלון-לילה 23:0005:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | | `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) **תיק-בודד לסבב (`limit=1`)** עד שהתור ריק (2 סבבים ריקים), עם קצב `pl.INTER_PRECEDENT_COOLDOWN_SEC` בין תיקים (נשמר אחרי המעבר ל-limit=1 שביטל את המרווח הפנימי-לסבב). **רץ רק בחלון-לילה 23:0005:00 שעון ישראל** (`_in_window`, zoneinfo DST-safe — המכונה UTC); מחוץ לחלון `===SKIP===`, ונעצר `===STOP===` כשהחלון נסגר (השאר ממשיך בלילה הבא, FIFO+checkpoint). env: `HALACHA_DRAIN_WINDOW_START`/`_END`/`HALACHA_DRAIN_TZ`. **שער-סף-ניצול (נוהל-יו"ר):** בין כל תיק בודק את ניצול-claude.ai דרך מקור-האמת המשותף `legal_mcp.services.usage_limits` (`USAGE_CEILINGS` — אותו שה-supervisor קורא) ועוצר `===STOP=== usage ceiling reached` כשנחצה סף-רך (5-שעות ≥75% / שבועי ≥65%, env `HALACHA_DRAIN_CEILING_5H`/`_WEEKLY`). הבדיקה **בין** תיקים → התיק שרץ תמיד מסתיים על המכסה שנותרה ופשוט לא מתחיל חדש — עצירה **לפני** 429, לא קטילה. fail-OPEN אם ה-endpoint לא-זמין (ה-supervisor מגבה בקטילת-429). **kill-switch `/operations`:** בודק `is_drain_disabled` בעלייה **וגם בתחילת כל סבב** — כיבוי באמצע-ריצה עוצר את הלולאה בגבול-הסבב הבא (התהליך עצמו נהרג מיד דרך ה-UI-toggle/סופרוייזר). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). **חילוץ תיק-בודד שהיו"ר מבקש רץ מיד דרך ה-CEO (`precedent_extract_halachot`) ואינו מגודר כאן.** | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני |
| `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `10 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:0005:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). דקת-הצתה `:10` (לא `:00`) כדי לא לחלוק דקה עם metadata-drain (`:00`) או supervisor (`:05`) — מונע deadlock של DDL-המיגרציה כששני דריינים עולים יחד. `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | | `legal-halacha-drain.config.cjs` | pm2/js | **תזמון חלון-לילה של `drain_halacha_queue.py`** (cron UTC `10 20,21,22,23,0,1,2,3 * * *` = superset שמכסה את 23:0005:00 ישראל בקיץ ובחורף; הסקריפט גוזם לחלון המדויק ב-zoneinfo). דקת-הצתה `:10` (לא `:00`) כדי לא לחלוק דקה עם metadata-drain (`:00`) או supervisor (`:05`) — מונע deadlock של DDL-המיגרציה כששני דריינים עולים יחד. `HALACHA_DRAIN_CRON` לעקיפה. ירייה כל שעה גם מחדשת one-shot שמת באמצע (advisory-lock הופך חפיפה לבטוחה). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
| `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (קורא `subscription_usage()` — endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** בזמן מוגבל **עוצר את הדריינר** (`hold-stopped`) כדי לא להלום 429, ומצית-מחדש כשהמכסה חוזרת (כל החלונות חזרו מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:0007:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations | | `halacha_drain_supervisor.py` | python | **מנהל-בריאות קבוע ל-`legal-halacha-drain`** (אפס צריכת-Claude — קורא DB/לוגים/pm2 ומצית את הדריינר הקיים). טיק יחיד: **מכבד `is_drain_disabled` בעדיפות עליונה — אם כבוי ב-/operations עוצר את הדריינר ולא מצית** · מצית כשבטל+תור≠ריק · restart ל-run תקוע (liveness לפי checkpoints-per-chunk, **לא** mtime-לוג שמתעדכן רק בסיום תיק ~10 דק') · **זיהוי rate-limit: ה-PRIMARY הוא `quota_exhausted()` הסמכותי (wrapper דק מעל מקור-האמת המשותף `legal_mcp.services.usage_limits` — `subscription_usage()`+`ceiling_status()`+`USAGE_CEILINGS`, אותו שהדריינר קורא, G1/G2 בלי כפילות; endpoint לא-מתועד `GET /api/oauth/usage`, token מ-`~/.claude/.credentials.json` + UA `claude-code/*` חובה אחרת 429 — אותו אחוז-ניצול 5-שעות/שבועי/שבועי-Sonnet שה-UI מציג. **ספי-עצירה-רכים `USAGE_CEILINGS` (נוהל-יו"ר 2026-06-15): עוצר לפני 429 — 5-שעות ≥75%, שבועי+שבועי-Sonnet ≥65% (עקיפה: `HALACHA_DRAIN_CEILING_5H`/`HALACHA_DRAIN_CEILING_WEEKLY`); הגעה לסף = בדיוק כמו מיצוי, cooldown עד `resets_at` של אותו חלון. הנימוק: 429 באמצע-תיק כופה חילוץ-מחדש של תיק שהושלם ומשחית אותו**), durable ואינו תלוי בעומק-זנב-הלוג; `scan_rate_limit` (429 + parse איפוס, מגודר-טריות) רק כ-fallback כש-endpoint לא-זמין — תיקון הבאג שבו 429 שגלל מתחת ל-churn של ה-restart נקרא כ"hung" וגרר restart-storm שבזבז מכסה.** **שתי תגובות ל-cooldown:** בסף-רך (≥75/65, **בלי** 429 טרי) — `hold-soft`: **אינו קוטל**, הדריינר קורא את אותו סף ועוצר לבד בין תיקים אחרי שמסיים את התיק הנוכחי (יש מכסה לסיים); קטילה (`hold-stopped`, `pm2 stop`) רק ב-**429 טרי אמיתי** (`log_rl` — הקריאות באמת נכשלות). מצית-מחדש כשהמכסה חוזרת (כל החלונות מתחת לסף; fallback `claude -p` זעיר אם ה-endpoint לא זמין). `status` מדפיס ניצול/סף-עצירה · **חלון catch-up בוקר `[05:0007:00 IDT)` (`CATCHUP_END`) שנפתח רק לניקוי backlog שנותר כשהמכסה חזרה — איפוס-5-שעות מאוחר נוחת לרוב מעט אחרי 05:00; מגודר ב-(לא-מוגבל)+(תור≠ריק), והקצה המורחב מועבר לדריינר כך ש-window-self-guard שלו מקבל את סבבי-ה-catch-up** · מאמת ש-staging מתחייב. **BURST** (חלון "רוץ ברצף עכשיו" ידני): מקור-אמת = `drain_controls.burst_until` ב-DB — אותו ערך ש-/operations קורא/כותב (G1 מקור-יחיד, G2 בלי מסלול מקביל); בעתיד→חלון מורם, אחרת חלון-לילה 23-05; פג-תוקף אוטומטי במועד. תת-פקודות: `tick` (ברירת-מחדל), `burst-on [--until]`, `burst-off`, `status`. | דרך `legal-halacha-supervisor.config.cjs` (pm2 cron) / ידני / כפתור /operations |
| `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `5-59/15 * * * *` = `:05,:20,:35,:50`, `HALACHA_SUPERVISOR_CRON` לעקיפה; דקת-הצתה `:05` כדי לא לחלוק דקה עם metadata-drain `:00` או halacha-drain `:10` — מונע deadlock של DDL-המיגרציה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) | | `legal-halacha-supervisor.config.cjs` | pm2/js | **תזמון כל 15 דק' של `halacha_drain_supervisor.py`** (cron `5-59/15 * * * *` = `:05,:20,:35,:50`, `HALACHA_SUPERVISOR_CRON` לעקיפה; דקת-הצתה `:05` כדי לא לחלוק דקה עם metadata-drain `:00` או halacha-drain `:10` — מונע deadlock של DDL-המיגרציה). `autorestart:false` (one-shot per tick). מצב-state ב-`~/halacha-drain-monitor/` (מחוץ ל-repo). התקנה: `pm2 start scripts/legal-halacha-supervisor.config.cjs && pm2 save`. | pm2 cron (host-side) |
| `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) |
| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | | `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` |

View File

@@ -1,6 +1,6 @@
"""Drain the halacha extraction queue for the incoming batch. """Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') in small batches Calls the canonical process_pending_extractions(kind='halacha') ONE case per round
until the queue is empty (two consecutive zero-progress rounds). Serial + global until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes. advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
@@ -14,6 +14,14 @@ requested by the chair goes through the CEO immediately and is NOT gated here.
Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides: Window is DST-safe (zoneinfo) — the host runs in UTC. Env overrides:
HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ. HALACHA_DRAIN_WINDOW_START / _END (hours, 023) · HALACHA_DRAIN_TZ.
USAGE-CEILING: between cases the drain checks the claude.ai utilization and stops
GRACEFULLY (===STOP===) once a soft ceiling is crossed (5-hour ≥75% / weekly ≥65%,
USAGE_CEILINGS in legal_mcp.services.usage_limits — the SAME source the supervisor
reads). Because it's checked between cases, the in-flight case always finishes on
the remaining quota; the drain just doesn't start a new one — stopping BEFORE a 429
rather than hammering it. Resumes when the window resets. Env overrides:
HALACHA_DRAIN_CEILING_5H / _WEEKLY.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
""" """
@@ -27,6 +35,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "
from legal_mcp.services import db from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl from legal_mcp.services import precedent_library as pl
from legal_mcp.services import usage_limits
_TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem")) _TZ = ZoneInfo(os.environ.get("HALACHA_DRAIN_TZ", "Asia/Jerusalem"))
_WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23")) _WINDOW_START = int(os.environ.get("HALACHA_DRAIN_WINDOW_START", "23"))
@@ -74,8 +83,26 @@ async def main():
print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — " print(f"===STOP=== drain window closed ({_WINDOW_END:02d}:00) — "
f"{total} cases this run; rest resumes next night", flush=True) f"{total} cases this run; rest resumes next night", flush=True)
break break
# Soft usage-ceiling gate (chair: 5-hour ≥75% / weekly ≥65%). Checked
# BETWEEN cases (limit=1 below), so the in-flight case always finishes on
# the remaining quota and we just don't start a new one — stopping BEFORE
# a 429 instead of hammering it. Same USAGE_CEILINGS the supervisor reads
# (single source of truth: legal_mcp.services.usage_limits).
# FAIL-OPEN on endpoint failure (returns None): keep draining — the
# supervisor backstops a real 429 by hard-killing. Do NOT "fix" this to
# fail-closed; a throttled endpoint must not halt the night's drain.
usage = await asyncio.get_event_loop().run_in_executor(
None, usage_limits.subscription_usage)
if usage is not None:
over, _, detail = usage_limits.ceiling_status(usage)
if over:
print(f"===STOP=== usage ceiling reached ({detail}) — finished "
f"{total} case(s); resumes when quota resets", flush=True)
break
rnd += 1 rnd += 1
out = await pl.process_pending_extractions(kind="halacha", limit=4) # limit=1 → the ceiling (and window/disabled) are re-checked between every
# case, so a soft stop never abandons an in-flight extraction.
out = await pl.process_pending_extractions(kind="halacha", limit=1)
processed = out.get("processed", 0) processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0) total_pending = out.get("total_pending", 0)
total += processed total += processed
@@ -87,6 +114,11 @@ async def main():
await asyncio.sleep(5) await asyncio.sleep(5)
else: else:
empty_rounds = 0 empty_rounds = 0
# Preserve the inter-case spacer that process_pending_extractions
# applies WITHIN a batch (only when idx>0) — with limit=1 every batch
# is one case, so it never fires there; pace here instead to avoid
# 429 bursts. Single source: the service's own cooldown constant.
await asyncio.sleep(pl.INTER_PRECEDENT_COOLDOWN_SEC)
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True) print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)

View File

@@ -49,19 +49,12 @@ import json
import os import os
import re import re
import subprocess import subprocess
import sys
import urllib.request import urllib.request
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from glob import glob from glob import glob
REPO = "/home/chaim/legal-ai" REPO = "/home/chaim/legal-ai"
# claude.ai subscription usage — the same 5-hour / 7-day utilization the Claude
# Code status bar shows, via the (undocumented) OAuth usage endpoint. The token
# lives in the CLI's own credentials file; the claude-code User-Agent is
# REQUIRED — without it the request lands in an aggressively rate-limited bucket
# and 429s. Unofficial endpoint: may change, so callers must tolerate None.
CLAUDE_CRED_PATH = "/home/chaim/.claude/.credentials.json"
OAUTH_USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
_USAGE_UA = "claude-code/2.1.177"
RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo) RUNTIME_DIR = "/home/chaim/halacha-drain-monitor" # state (outside repo)
STATE = os.path.join(RUNTIME_DIR, "state.json") STATE = os.path.join(RUNTIME_DIR, "state.json")
DRAIN = "legal-halacha-drain" DRAIN = "legal-halacha-drain"
@@ -77,29 +70,17 @@ NIGHT_START, NIGHT_END = 23, 5 # the drain's normal window (IDT hours)
CATCHUP_END = 7 # soft window end (IDT) for early-morning catch-up — see fix B CATCHUP_END = 7 # soft window end (IDT) for early-morning catch-up — see fix B
def _env_int(name: str, default: int) -> int: # Usage-ceiling logic (subscription_usage / ceiling_status / USAGE_CEILINGS) is
try: # the SINGLE source of truth, shared with the drain — see
return int(os.environ.get(name, default)) # legal_mcp/services/usage_limits.py. The supervisor runs as system python3, so
except (TypeError, ValueError): # put the (stdlib-only) package on the path before importing it. Resolve relative
return default # to THIS file (same as drain_halacha_queue.py) so the module is loaded from the
# same checkout the script lives in.
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)),
# Soft utilization ceilings — stop the drain BEFORE a window actually exhausts "..", "mcp-server", "src"))
# (429s). Hitting a 429 mid-case forces re-extraction of an already-completed from legal_mcp.services.usage_limits import ( # noqa: E402
# case under the rate limit, DEGRADING it; stopping at the chair's ceilings instead USAGE_CEILINGS, ceiling_status, subscription_usage,
# lets the in-flight halacha case finish cleanly and the drain idle until the )
# window resets. Reaching a ceiling is treated EXACTLY like 100% exhaustion
# (cooldown until that window's resets_at). Per the chair (2026-06-15): the 5-hour
# ("hourly session") window stops at 75%, the weekly windows at 65%. Both keys map
# to the same windows quota_available / quota_exhausted gate on; overridable via
# env for ops tuning without a redeploy.
CEILING_FIVE_HOUR = _env_int("HALACHA_DRAIN_CEILING_5H", 75)
CEILING_WEEKLY = _env_int("HALACHA_DRAIN_CEILING_WEEKLY", 65)
USAGE_CEILINGS = {
"five_hour": CEILING_FIVE_HOUR,
"seven_day": CEILING_WEEKLY,
"seven_day_sonnet": CEILING_WEEKLY,
}
def _now_utc(): def _now_utc():
@@ -135,31 +116,6 @@ CLAUDE = claude_bin()
_ENV = {**os.environ, "HOME": "/home/chaim"} _ENV = {**os.environ, "HOME": "/home/chaim"}
def subscription_usage() -> dict | None:
"""Read the claude.ai subscription usage — the exact 5-hour / 7-day
utilization the Claude Code UI shows — from the OAuth usage endpoint.
Returns the parsed JSON (keys: five_hour, seven_day, seven_day_opus,
seven_day_sonnet, extra_usage; each window → {utilization 0-100, resets_at})
or None on ANY failure. Undocumented endpoint — every caller must tolerate
None and fall back."""
try:
with open(CLAUDE_CRED_PATH) as f:
token = json.load(f)["claudeAiOauth"]["accessToken"]
except Exception:
return None
req = urllib.request.Request(OAUTH_USAGE_URL, headers={
"Authorization": f"Bearer {token}",
"User-Agent": _USAGE_UA, # required — else aggressive 429
"anthropic-beta": "oauth-2025-04-20",
})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
def quota_available() -> bool: def quota_available() -> bool:
"""Is the claude.ai quota actually usable right now? """Is the claude.ai quota actually usable right now?
@@ -176,14 +132,10 @@ def quota_available() -> bool:
timeout, or limit message → treat as still limited.""" timeout, or limit message → treat as still limited."""
usage = subscription_usage() usage = subscription_usage()
if usage is not None: if usage is not None:
# A drain run needs the 5-hour window, the weekly all-models cap, AND # All gated windows (5-hour, weekly all-models, weekly-Sonnet) must be
# the weekly per-model cap all below their ceilings. On this account the # below their ceilings — same evaluation the drain uses (ceiling_status).
# per-model cap that's actually populated is Sonnet (seven_day_opus is over, _, _ = ceiling_status(usage)
# null — no separate Opus cap); the all-models seven_day cap is the return not over
# backstop for Opus usage either way. null utilization → treated as 0%.
utils = {w: (usage.get(w) or {}).get("utilization") for w in USAGE_CEILINGS}
# utilization may be None (window inactive / no data) → treat as 0%.
return all((u or 0) < USAGE_CEILINGS[w] for w, u in utils.items())
# ── fallback: official-CLI probe ── # ── fallback: official-CLI probe ──
try: try:
r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"], r = subprocess.run([CLAUDE, "-p", "Reply with exactly: OK"],
@@ -210,23 +162,13 @@ def quota_exhausted():
Returns (exhausted: bool, earliest_reset_utc: datetime|None), or None when the Returns (exhausted: bool, earliest_reset_utc: datetime|None), or None when the
endpoint is unreachable (caller falls back to the log scrape). A window counts endpoint is unreachable (caller falls back to the log scrape). A window counts
as exhausting the drain at >= its USAGE_CEILINGS ceiling (the chair's soft as exhausting the drain at >= its USAGE_CEILINGS ceiling (the chair's soft
stop-before-429 thresholds) — same windows quota_available gates on (5-hour, stop-before-429 thresholds) — the SAME evaluation the drain gates on
weekly all-models, weekly-Sonnet).""" (ceiling_status)."""
usage = subscription_usage() usage = subscription_usage()
if usage is None: if usage is None:
return None return None
exhausted, resets = False, [] over, reset, _ = ceiling_status(usage)
for w, ceiling in USAGE_CEILINGS.items(): return over, reset
info = usage.get(w) or {}
if (info.get("utilization") or 0) >= ceiling:
exhausted = True
r = info.get("resets_at")
if r:
try:
resets.append(datetime.fromisoformat(r).astimezone(timezone.utc))
except Exception:
pass
return exhausted, (min(resets) if resets else None)
# ── DB access (via the repo venv; the module self-configures) ──────────────── # ── DB access (via the repo venv; the module self-configures) ────────────────
@@ -544,15 +486,24 @@ def tick():
notes.append("התור ריק — אין מה לחלץ.") notes.append("התור ריק — אין מה לחלץ.")
elif in_cooldown: elif in_cooldown:
mode = "weekly_exhausted" if weekly else "ratelimited" mode = "weekly_exhausted" if weekly else "ratelimited"
# Stop a running drain while limited — otherwise it keeps spawning Opus # Two cooldown causes, two responses:
# calls that 429 on every chunk, burning the very quota we're waiting on # • FRESH real 429 (log_rl) — the drain is literally failing on every
# (and burying the 429 signal under teardown noise). It re-ignites via the # chunk, burning the quota we're waiting on and re-extracting/degrading
# normal trigger path once cooldown clears. # completed cases. HARD-KILL it (pm2 stop); re-ignites once cooldown
if status == "online": # clears.
# • SOFT ceiling (≥75%/65% with no fresh 429) — there's still quota
# headroom, so DON'T kill: the drain reads the SAME ceiling and stops
# itself between cases, finishing the in-flight case cleanly. We just
# hold (never re-trigger while in_cooldown).
if status == "online" and log_rl:
stop_drain() stop_drain()
action = "hold-stopped" action = "hold-stopped"
notes.append(f"rate-limit פעיל — הדריינר נעצר כדי לא לבזבז מכסה על 429; " notes.append(f"429 טרי — הדריינר נקטל כדי לא להלום rate-limit; "
f"איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") f"איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
elif status == "online":
action = "hold-soft"
notes.append(f"סף-ניצול נחצה (אין 429 טרי) — הדריינר יסיים את התיק "
f"הנוכחי ויעצור לבד; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")
else: else:
action = "hold" action = "hold"
notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.") notes.append(f"rate-limit פעיל; איפוס ~{cd_dt.astimezone(IDT):%H:%M IDT}.")