Files
legal-ai/scripts/ingest_digests_batch.py
Chaim fb40ec8565 refactor(digests): single source of truth — drop processed/ folder state (X12)
ה-DB (`digests`) הוא מקור-האמת היחיד למצב-קליטה. ingest_digests_batch.py העביר
קבצים incoming→processed/ — state מבוסס-תיקיות מקביל ל-DB (הפרת-G2 קטנה).

- הוסר ה-move ל-processed/ + import shutil + PROCESSED. הסקריפט מסתמך על
  dedup ב-content_hash (ingest_digest מחזיר 'exists' לקיימים) → הרצה חוזרת בטוחה.
- תיקיות (incoming/) = staging בלבד, לא state.
- X12 INV-DIG2: תועד מקור-אמת-יחיד + ההפרה-שתוקנה (processed/).
- SCRIPTS.md עודכן.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:33:18 +00:00

139 lines
5.1 KiB
Python

"""Batch ingest of "כל יום" daily digests staged in data/digests/incoming/ (X12).
Sequential (NOT concurrent — same load-spike caution as ingest_incoming_batch.py)
ingest of each yomon PDF via the standalone digest pipeline
(``digest_library.ingest_digest``), which:
- extracts text, dedups on content_hash (idempotent),
- runs the local LLM metadata extractor (concept_tag, headline, underlying
citation, two dates, practice_area, subject_tags),
- stores a single embedding,
- auto-links to the underlying ruling if it is already in the precedent
library (INV-DIG3).
The digest is a SECONDARY, radar-only source — it never enters the precedent /
halacha pipeline and is never cited in a decision (INV-DIG1/2). After this run,
relink unmatched digests once the originals are uploaded, or surface them via
missing_precedent_create.
SINGLE SOURCE OF TRUTH: the `digests` table (DB) is the ONLY authority for what
has been ingested. This script does NOT move files between folders — re-running
is safe because ``ingest_digest`` dedups on content_hash (already-ingested →
returns ``exists``). Files left in ``incoming/`` are simply re-checked and
skipped. (Earlier versions moved files to a ``processed/`` folder; that created
a second, divergent state and was removed.)
Yomon number + issue date are parsed from the filename
("יומון 5158 - 31.5.26.pdf") as hints; the LLM also extracts them from the
body and the explicit hint wins. The monthly bulletin (e.g. "201 יוני.pdf") is
multi-topic and skipped (Phase 3).
Run: mcp-server/.venv/bin/python scripts/ingest_digests_batch.py
(optionally pass explicit file paths as args)
Config (POSTGRES_URL, VOYAGE_API_KEY, ANTHROPIC_API_KEY) auto-loads from ~/.env.
"""
import asyncio
import os
import re
import sys
import traceback
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp import config # noqa: E402
from legal_mcp.services import digest_library as svc # noqa: E402
INCOMING = Path(config.DATA_DIR) / "digests" / "incoming"
# Matches "יומון 5158 - 31.5.26" → ("5158", "31.5.26")
_NAME_RE = re.compile(r"יומון\s*(\d+)\s*-\s*(\d{1,2})\.(\d{1,2})\.(\d{2,4})")
def _parse_name(fname: str) -> tuple[str, str | None]:
"""Return (yomon_number, iso_date_or_None) parsed from the filename."""
m = _NAME_RE.search(fname)
if not m:
return "", None
num, dd, mm, yy = m.groups()
year = int(yy)
if year < 100:
year += 2000
try:
iso = f"{year:04d}-{int(mm):02d}-{int(dd):02d}"
except ValueError:
iso = None
return num, iso
def _discover() -> list[Path]:
if not INCOMING.exists():
return []
out = []
for p in sorted(INCOMING.glob("*.pdf")):
if "יומון" not in p.name:
print(f"⊘ skip (not a single yomon): {p.name}", flush=True)
continue
out.append(p)
return out
async def main(argv: list[str]) -> None:
files = [Path(a) for a in argv] if argv else _discover()
if not files:
print(f"No yomon PDFs found in {INCOMING}", flush=True)
return
results = []
for idx, fp in enumerate(files):
rec = {"file": fp.name}
if not fp.exists():
rec["error"] = "file-missing"
print(f"{fp.name}: file missing", flush=True)
results.append(rec)
continue
yomon_number, iso_date = _parse_name(fp.name)
try:
out = await svc.ingest_digest(
file_path=fp,
yomon_number=yomon_number,
digest_date=iso_date,
)
rec.update({
"status": out.get("status"),
"digest_id": out.get("digest_id"),
"yomon_number": out.get("yomon_number"),
"underlying_citation": out.get("underlying_citation"),
"linked_case_law_id": out.get("linked_case_law_id"),
})
link = "🔗 linked" if out.get("linked_case_law_id") else "⚠ unlinked"
print(
f"{fp.name}: {out.get('status')} | yomon={out.get('yomon_number')} | "
f"{link} | {out.get('underlying_citation')}",
flush=True,
)
# No folder move — the DB (content_hash) is the single source of
# truth. Re-running re-checks incoming/ and skips already-ingested.
except Exception as e:
rec["error"] = f"{type(e).__name__}: {e}"
print(f"{fp.name}: {e}", flush=True)
traceback.print_exc()
results.append(rec)
print("\n===SUMMARY===", flush=True)
for r in results:
print(r, flush=True)
linked = sum(1 for r in results if r.get("linked_case_law_id"))
unlinked = sum(
1 for r in results
if r.get("status") in ("completed", "exists") and not r.get("linked_case_law_id")
)
print(
f"\nTotal: {len(results)} | linked: {linked} | unlinked (need precedent upload): {unlinked}",
flush=True,
)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))