Files
legal-ai/scripts/drain_digests.py
Chaim 638eef6803 feat(ops): /operations — מוני-תור אחידים, "מה רץ עכשיו", וניהול-תהליכים
הדף הציג את התורים באופן לא-אחיד (by_status גולמי), בלי הבחנה בין "ממתין"
(בקלוג: status=pending) ל"בתור" (התור הפעיל: requested_at IS NOT NULL), בלי
הצגת הפריט שרץ כרגע, ובלי שום שליטה בתהליכים.

מה נוסף:
1. כרטיסי-תור אחידים — בתור / ממתין(בקלוג) / בעיבוד / הושלם / נכשל + "רץ עכשיו"
   (citation/case_number של הפריט בעיבוד) לכל drain (אחזור-פסיקה, מטא-דאטה,
   הלכות, יומונים). שערי-אנוש (אישור-הלכות, פסיקה-חסרה) נשארים מוני-סטטוס.
2. פאנל ניהול-תהליכים בסגנון "שירותי Windows":
   - דמון (court-fetch-service/xvfb/chat/reaper): הפעל-מחדש / עצור / הפעל.
   - cron drain: "הרץ עכשיו" (pm2 restart) + מתג הפעל/כבה תזמון.
3. כל תגי-הסטטוס מתורגמים לעברית.

מנגנון:
- הפעל/כבה תזמון = דגל ב-DB (טבלה drain_controls). pm2 cron_restart מחיה תהליך
  שעוצר ב-stop, לכן ה"כיבוי" האמין הוא דגל שכל drain בודק ב-startup (no-op מיידי
  כשכבוי). הקונטיינר כותב/קורא ישירות מ-DB.
- הרץ-עכשיו + restart/stop/start = proxy ל-pm2 דרך endpoint חדש בגשר-המארח
  (court_fetch_service /pm2/control), מאובטח Bearer + whitelist ל-legal-* בלבד.
- יומונים: drain_digests הועבר מ-crontab ל-pm2 (legal-digest-drain.config.cjs)
  כדי שיופיע ויהיה שליט כמו כל drain. drain_halacha_queue.py הובא לבקרת-גרסאות.

Invariants: מקיים G2 (הרחבת /operations + הגשר הקיים, לא מסלול מקביל) ו-G1
(drain_controls = מקור-אמת יחיד לכיבוי, נורמליזציה במקור ולא תיקון-בקריאה).
אין בליעת שגיאות שקטה (הגשר מחזיר {ok,error}; המוטציות מציגות toast).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 08:57:23 +00:00

119 lines
5.3 KiB
Python

"""Drain the digest enrichment queue (X12) — local LLM enrichment of pending digests.
The web/n8n upload path creates digest rows with extraction_status='pending'
(container-safe: stage + extract_text only). The LLM metadata extraction +
embedding + autolink MUST run locally (claude_session is local-only — the
``claude`` CLI is not in the container). This script is that local drainer:
pending digests → digest_library.enrich_digest (Sonnet, tools="") → completed
Concurrency-limited (avoids LLM rate-limit storms). Idempotent — only touches
rows still 'pending'; safe to re-run. The DB is the single source of truth.
Used two ways:
1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py
2. pm2 cron ``legal-digest-drain`` (scripts/legal-digest-drain.config.cjs) —
one-shot per tick. Controllable from the /operations dashboard (run-now /
enable / disable). Logs to data/digests/drain.log.
claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config
(POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config.
"""
import asyncio
import os
import sys
import time
from datetime import datetime, timezone
# Ensure the local claude CLI is reachable even under a bare cron PATH.
os.environ["PATH"] = os.path.expanduser("~/.local/bin") + os.pathsep + os.environ.get("PATH", "")
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db, digest_library as dl # noqa: E402
CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3"))
async def main() -> int:
pool = await db.get_pool()
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-digest-drain"):
print("===SKIP=== legal-digest-drain disabled via /operations", flush=True)
await db.close_pool()
return 0
# get_pool() runs schema migrations first — incl. the V32 digest_kind backfill
# that classifies legacy rows — so the failure check below is safe from the
# very first run (no legacy row has digest_kind='').
#
# Self-heal: a successful enrich ALWAYS sets digest_kind (decision/announcement
# /other). So a 'completed' row with digest_kind='' means the extraction never
# landed (e.g. the local claude subscription window was exhausted) — reset to
# 'pending' to retry (idempotent auto-resume). This correctly does NOT touch
# announcements (digest_kind='announcement', legitimately no citation), which
# the old "both fields empty" heuristic wrongly retried forever.
healed = await pool.execute(
"UPDATE digests SET extraction_status = 'pending' "
"WHERE extraction_status = 'completed' "
"AND coalesce(digest_kind,'') = '' "
"AND coalesce(analysis_text,'') <> ''"
)
if healed and healed != "UPDATE 0":
print(f"self-heal: reset unclassified (failed) digests → pending ({healed})", flush=True)
# Self-heal stale 'processing': flock guarantees a single drainer, so at the
# start of THIS run any row left 'processing' is from a previous run that was
# killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried.
stale = await pool.execute(
"UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'"
)
if stale and stale != "UPDATE 0":
print(f"self-heal: reset stale processing → pending ({stale})", flush=True)
rows = await pool.fetch(
"SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at"
)
ids = [r["id"] for r in rows]
total = len(ids)
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ")
if not total:
print(f"[{stamp}] no pending digests — nothing to drain", flush=True)
await db.close_pool()
return 0
print(f"[{stamp}] draining {total} pending digests @ concurrency={CONCURRENCY}", flush=True)
sem = asyncio.Semaphore(CONCURRENCY)
state = {"done": 0, "ok": 0, "linked": 0, "fail": 0}
t0 = time.time()
async def work(did):
async with sem:
try:
res = await dl.enrich_digest(did)
state["ok"] += 1
if res.get("linked_case_law_id"):
state["linked"] += 1
except Exception as e:
state["fail"] += 1
print(f" FAIL {did}: {type(e).__name__}: {e}", flush=True)
state["done"] += 1
if state["done"] % 20 == 0 or state["done"] == total:
el = (time.time() - t0) / 60
print(
f" progress {state['done']}/{total} | ok={state['ok']} "
f"linked={state['linked']} fail={state['fail']} | {el:.1f}min",
flush=True,
)
await asyncio.gather(*[work(i) for i in ids])
done_stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ")
print(
f"[{done_stamp}] DONE {state['done']}/{total} | ok={state['ok']} "
f"linked={state['linked']} fail={state['fail']} | {(time.time()-t0)/60:.1f}min",
flush=True,
)
await db.close_pool()
return 1 if state["fail"] else 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))