~2% מגיליונות "כל יום" הם לא-הכרעות (עדכוני-חקיקה/הודעות/ברכות) ללא ruling → החילוץ ה-decision-centric החזיר ריק → both-empty → מחזורי ב-self-heal. - SCHEMA_V32: `digest_kind` (decision/announcement/other) + backfill legacy בזול (יש citation→decision, אחרת announcement) — לפני שה-self-heal מסתמך עליו. - extractor: prompt מסווג + מחלץ תמיד concept/headline/summary; underlying_* רק ל-decision. extract מנרמל digest_kind. - enrich: שומר digest_kind; חילוץ מוצלח תמיד מסתיים ב-kind לא-ריק (ברירת-מחדל לפי citation אם המודל השמיט). - drain self-heal: הגדרת-כשל = completed עם digest_kind='' (במקום both-empty) → הודעות לא מנוסות-מחדש לנצח. - db: digest_kind ב-_DIGEST_COLS + update-whitelist (זורם ל-search/list/API). - X12 spec: תיעוד digest_kind + הגדרת-הכשל המתוקנת. אומת: V32 סיווג 533 (525 decision + 8 announcement, 0 unclassified — self-heal לא נוגע בהם). extract: 5163→decision+citation · 5060→announcement+concept, citation ריק (לא both-empty). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
112 lines
4.9 KiB
Python
112 lines
4.9 KiB
Python
"""Drain the digest enrichment queue (X12) — local LLM enrichment of pending digests.
|
|
|
|
The web/n8n upload path creates digest rows with extraction_status='pending'
|
|
(container-safe: stage + extract_text only). The LLM metadata extraction +
|
|
embedding + autolink MUST run locally (claude_session is local-only — the
|
|
``claude`` CLI is not in the container). This script is that local drainer:
|
|
|
|
pending digests → digest_library.enrich_digest (Sonnet, tools="") → completed
|
|
|
|
Concurrency-limited (avoids LLM rate-limit storms). Idempotent — only touches
|
|
rows still 'pending'; safe to re-run. The DB is the single source of truth.
|
|
|
|
Used two ways:
|
|
1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py
|
|
2. Daily cron (after the n8n 09:30 Gmail poll) — see crontab; runs under flock
|
|
so a slow run never overlaps the next. Logs to data/digests/drain.log.
|
|
|
|
claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config
|
|
(POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
# Ensure the local claude CLI is reachable even under a bare cron PATH.
|
|
os.environ["PATH"] = os.path.expanduser("~/.local/bin") + os.pathsep + os.environ.get("PATH", "")
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
|
|
|
|
from legal_mcp.services import db, digest_library as dl # noqa: E402
|
|
|
|
CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3"))
|
|
|
|
|
|
async def main() -> int:
|
|
pool = await db.get_pool()
|
|
# get_pool() runs schema migrations first — incl. the V32 digest_kind backfill
|
|
# that classifies legacy rows — so the failure check below is safe from the
|
|
# very first run (no legacy row has digest_kind='').
|
|
#
|
|
# Self-heal: a successful enrich ALWAYS sets digest_kind (decision/announcement
|
|
# /other). So a 'completed' row with digest_kind='' means the extraction never
|
|
# landed (e.g. the local claude subscription window was exhausted) — reset to
|
|
# 'pending' to retry (idempotent auto-resume). This correctly does NOT touch
|
|
# announcements (digest_kind='announcement', legitimately no citation), which
|
|
# the old "both fields empty" heuristic wrongly retried forever.
|
|
healed = await pool.execute(
|
|
"UPDATE digests SET extraction_status = 'pending' "
|
|
"WHERE extraction_status = 'completed' "
|
|
"AND coalesce(digest_kind,'') = '' "
|
|
"AND coalesce(analysis_text,'') <> ''"
|
|
)
|
|
if healed and healed != "UPDATE 0":
|
|
print(f"self-heal: reset unclassified (failed) digests → pending ({healed})", flush=True)
|
|
# Self-heal stale 'processing': flock guarantees a single drainer, so at the
|
|
# start of THIS run any row left 'processing' is from a previous run that was
|
|
# killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried.
|
|
stale = await pool.execute(
|
|
"UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'"
|
|
)
|
|
if stale and stale != "UPDATE 0":
|
|
print(f"self-heal: reset stale processing → pending ({stale})", flush=True)
|
|
rows = await pool.fetch(
|
|
"SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at"
|
|
)
|
|
ids = [r["id"] for r in rows]
|
|
total = len(ids)
|
|
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ")
|
|
if not total:
|
|
print(f"[{stamp}] no pending digests — nothing to drain", flush=True)
|
|
await db.close_pool()
|
|
return 0
|
|
print(f"[{stamp}] draining {total} pending digests @ concurrency={CONCURRENCY}", flush=True)
|
|
sem = asyncio.Semaphore(CONCURRENCY)
|
|
state = {"done": 0, "ok": 0, "linked": 0, "fail": 0}
|
|
t0 = time.time()
|
|
|
|
async def work(did):
|
|
async with sem:
|
|
try:
|
|
res = await dl.enrich_digest(did)
|
|
state["ok"] += 1
|
|
if res.get("linked_case_law_id"):
|
|
state["linked"] += 1
|
|
except Exception as e:
|
|
state["fail"] += 1
|
|
print(f" FAIL {did}: {type(e).__name__}: {e}", flush=True)
|
|
state["done"] += 1
|
|
if state["done"] % 20 == 0 or state["done"] == total:
|
|
el = (time.time() - t0) / 60
|
|
print(
|
|
f" progress {state['done']}/{total} | ok={state['ok']} "
|
|
f"linked={state['linked']} fail={state['fail']} | {el:.1f}min",
|
|
flush=True,
|
|
)
|
|
|
|
await asyncio.gather(*[work(i) for i in ids])
|
|
done_stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ")
|
|
print(
|
|
f"[{done_stamp}] DONE {state['done']}/{total} | ok={state['ok']} "
|
|
f"linked={state['linked']} fail={state['fail']} | {(time.time()-t0)/60:.1f}min",
|
|
flush=True,
|
|
)
|
|
await db.close_pool()
|
|
return 1 if state["fail"] else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(main()))
|