diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 0333ae4..bc5114f 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -91,6 +91,7 @@ | `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) | | `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). משמש אחרי `ingest_incoming_batch.py`. | ידני אחרי batch (חלופה ל-MCP `precedent_process_pending`) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | +| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. | **cron יומי** (10:00, אחרי ה-poll של n8n; flock למניעת חפיפה → `data/digests/drain.log`) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | ## סקריפטים שנמחקו (git history בלבד) diff --git a/scripts/drain_digests.py b/scripts/drain_digests.py new file mode 100644 index 0000000..5c3e479 --- /dev/null +++ b/scripts/drain_digests.py @@ -0,0 +1,85 @@ +"""Drain the digest enrichment queue (X12) — local LLM enrichment of pending digests. + +The web/n8n upload path creates digest rows with extraction_status='pending' +(container-safe: stage + extract_text only). The LLM metadata extraction + +embedding + autolink MUST run locally (claude_session is local-only — the +``claude`` CLI is not in the container). This script is that local drainer: + + pending digests → digest_library.enrich_digest (Sonnet, tools="") → completed + +Concurrency-limited (avoids LLM rate-limit storms). Idempotent — only touches +rows still 'pending'; safe to re-run. The DB is the single source of truth. + +Used two ways: + 1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py + 2. Daily cron (after the n8n 09:30 Gmail poll) — see crontab; runs under flock + so a slow run never overlaps the next. Logs to data/digests/drain.log. + +claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config +(POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config. +""" + +import asyncio +import os +import sys +import time +from datetime import datetime, timezone + +# Ensure the local claude CLI is reachable even under a bare cron PATH. +os.environ["PATH"] = os.path.expanduser("~/.local/bin") + os.pathsep + os.environ.get("PATH", "") +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) + +from legal_mcp.services import db, digest_library as dl # noqa: E402 + +CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3")) + + +async def main() -> int: + pool = await db.get_pool() + rows = await pool.fetch( + "SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at" + ) + ids = [r["id"] for r in rows] + total = len(ids) + stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ") + if not total: + print(f"[{stamp}] no pending digests — nothing to drain", flush=True) + await db.close_pool() + return 0 + print(f"[{stamp}] draining {total} pending digests @ concurrency={CONCURRENCY}", flush=True) + sem = asyncio.Semaphore(CONCURRENCY) + state = {"done": 0, "ok": 0, "linked": 0, "fail": 0} + t0 = time.time() + + async def work(did): + async with sem: + try: + res = await dl.enrich_digest(did) + state["ok"] += 1 + if res.get("linked_case_law_id"): + state["linked"] += 1 + except Exception as e: + state["fail"] += 1 + print(f" FAIL {did}: {type(e).__name__}: {e}", flush=True) + state["done"] += 1 + if state["done"] % 20 == 0 or state["done"] == total: + el = (time.time() - t0) / 60 + print( + f" progress {state['done']}/{total} | ok={state['ok']} " + f"linked={state['linked']} fail={state['fail']} | {el:.1f}min", + flush=True, + ) + + await asyncio.gather(*[work(i) for i in ids]) + done_stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%SZ") + print( + f"[{done_stamp}] DONE {state['done']}/{total} | ok={state['ok']} " + f"linked={state['linked']} fail={state['fail']} | {(time.time()-t0)/60:.1f}min", + flush=True, + ) + await db.close_pool() + return 1 if state["fail"] else 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main()))