"""Drain the digest enrichment queue (X12) — local LLM enrichment of pending digests. The web/n8n upload path creates digest rows with extraction_status='pending' (container-safe: stage + extract_text only). The LLM metadata extraction + embedding + autolink MUST run locally (claude_session is local-only — the ``claude`` CLI is not in the container). This script is that local drainer: pending digests → digest_library.enrich_digest (Sonnet, tools="") → completed Concurrency-limited (avoids LLM rate-limit storms). Idempotent — only touches rows still 'pending'; safe to re-run. The DB is the single source of truth. Used two ways: 1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py 2. Daily cron (after the n8n 09:30 Gmail poll) — see crontab; runs under flock so a slow run never overlaps the next. Logs to data/digests/drain.log. claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config (POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config. """ import asyncio import os import sys import time from datetime import datetime, timezone # Ensure the local claude CLI is reachable even under a bare cron PATH. os.environ["PATH"] = os.path.expanduser("~/.local/bin") + os.pathsep + os.environ.get("PATH", "") sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import db, digest_library as dl # noqa: E402 CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3")) async def main() -> int: pool = await db.get_pool() # get_pool() runs schema migrations first — incl. the V32 digest_kind backfill # that classifies legacy rows — so the failure check below is safe from the # very first run (no legacy row has digest_kind=''). # # Self-heal: a successful enrich ALWAYS sets digest_kind (decision/announcement # /other). So a 'completed' row with digest_kind='' means the extraction never # landed (e.g. the local claude subscription window was exhausted) — reset to # 'pending' to retry (idempotent auto-resume). This correctly does NOT touch # announcements (digest_kind='announcement', legitimately no citation), which # the old "both fields empty" heuristic wrongly retried forever. healed = await pool.execute( "UPDATE digests SET extraction_status = 'pending' " "WHERE extraction_status = 'completed' " "AND coalesce(digest_kind,'') = '' " "AND coalesce(analysis_text,'') <> ''" ) if healed and healed != "UPDATE 0": print(f"self-heal: reset unclassified (failed) digests → pending ({healed})", flush=True) # Self-heal stale 'processing': flock guarantees a single drainer, so at the # start of THIS run any row left 'processing' is from a previous run that was # killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried. stale = await pool.execute( "UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'" ) if stale and stale != "UPDATE 0": print(f"self-heal: reset stale processing → pending ({stale})", flush=True) rows = await pool.fetch( "SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at" ) ids = [r["id"] for r in rows] total = len(ids) stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ") if not total: print(f"[{stamp}] no pending digests — nothing to drain", flush=True) await db.close_pool() return 0 print(f"[{stamp}] draining {total} pending digests @ concurrency={CONCURRENCY}", flush=True) sem = asyncio.Semaphore(CONCURRENCY) state = {"done": 0, "ok": 0, "linked": 0, "fail": 0} t0 = time.time() async def work(did): async with sem: try: res = await dl.enrich_digest(did) state["ok"] += 1 if res.get("linked_case_law_id"): state["linked"] += 1 except Exception as e: state["fail"] += 1 print(f" FAIL {did}: {type(e).__name__}: {e}", flush=True) state["done"] += 1 if state["done"] % 20 == 0 or state["done"] == total: el = (time.time() - t0) / 60 print( f" progress {state['done']}/{total} | ok={state['ok']} " f"linked={state['linked']} fail={state['fail']} | {el:.1f}min", flush=True, ) await asyncio.gather(*[work(i) for i in ids]) done_stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ") print( f"[{done_stamp}] DONE {state['done']}/{total} | ok={state['ok']} " f"linked={state['linked']} fail={state['fail']} | {(time.time()-t0)/60:.1f}min", flush=True, ) await db.close_pool() return 1 if state["fail"] else 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))