ה-DB (`digests`) הוא מקור-האמת היחיד למצב-קליטה. ingest_digests_batch.py העביר קבצים incoming→processed/ — state מבוסס-תיקיות מקביל ל-DB (הפרת-G2 קטנה). - הוסר ה-move ל-processed/ + import shutil + PROCESSED. הסקריפט מסתמך על dedup ב-content_hash (ingest_digest מחזיר 'exists' לקיימים) → הרצה חוזרת בטוחה. - תיקיות (incoming/) = staging בלבד, לא state. - X12 INV-DIG2: תועד מקור-אמת-יחיד + ההפרה-שתוקנה (processed/). - SCRIPTS.md עודכן. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
139 lines
5.1 KiB
Python
139 lines
5.1 KiB
Python
"""Batch ingest of "כל יום" daily digests staged in data/digests/incoming/ (X12).
|
|
|
|
Sequential (NOT concurrent — same load-spike caution as ingest_incoming_batch.py)
|
|
ingest of each yomon PDF via the standalone digest pipeline
|
|
(``digest_library.ingest_digest``), which:
|
|
- extracts text, dedups on content_hash (idempotent),
|
|
- runs the local LLM metadata extractor (concept_tag, headline, underlying
|
|
citation, two dates, practice_area, subject_tags),
|
|
- stores a single embedding,
|
|
- auto-links to the underlying ruling if it is already in the precedent
|
|
library (INV-DIG3).
|
|
|
|
The digest is a SECONDARY, radar-only source — it never enters the precedent /
|
|
halacha pipeline and is never cited in a decision (INV-DIG1/2). After this run,
|
|
relink unmatched digests once the originals are uploaded, or surface them via
|
|
missing_precedent_create.
|
|
|
|
SINGLE SOURCE OF TRUTH: the `digests` table (DB) is the ONLY authority for what
|
|
has been ingested. This script does NOT move files between folders — re-running
|
|
is safe because ``ingest_digest`` dedups on content_hash (already-ingested →
|
|
returns ``exists``). Files left in ``incoming/`` are simply re-checked and
|
|
skipped. (Earlier versions moved files to a ``processed/`` folder; that created
|
|
a second, divergent state and was removed.)
|
|
|
|
Yomon number + issue date are parsed from the filename
|
|
("יומון 5158 - 31.5.26.pdf") as hints; the LLM also extracts them from the
|
|
body and the explicit hint wins. The monthly bulletin (e.g. "201 יוני.pdf") is
|
|
multi-topic and skipped (Phase 3).
|
|
|
|
Run: mcp-server/.venv/bin/python scripts/ingest_digests_batch.py
|
|
(optionally pass explicit file paths as args)
|
|
Config (POSTGRES_URL, VOYAGE_API_KEY, ANTHROPIC_API_KEY) auto-loads from ~/.env.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
|
|
|
|
from legal_mcp import config # noqa: E402
|
|
from legal_mcp.services import digest_library as svc # noqa: E402
|
|
|
|
INCOMING = Path(config.DATA_DIR) / "digests" / "incoming"
|
|
|
|
# Matches "יומון 5158 - 31.5.26" → ("5158", "31.5.26")
|
|
_NAME_RE = re.compile(r"יומון\s*(\d+)\s*-\s*(\d{1,2})\.(\d{1,2})\.(\d{2,4})")
|
|
|
|
|
|
def _parse_name(fname: str) -> tuple[str, str | None]:
|
|
"""Return (yomon_number, iso_date_or_None) parsed from the filename."""
|
|
m = _NAME_RE.search(fname)
|
|
if not m:
|
|
return "", None
|
|
num, dd, mm, yy = m.groups()
|
|
year = int(yy)
|
|
if year < 100:
|
|
year += 2000
|
|
try:
|
|
iso = f"{year:04d}-{int(mm):02d}-{int(dd):02d}"
|
|
except ValueError:
|
|
iso = None
|
|
return num, iso
|
|
|
|
|
|
def _discover() -> list[Path]:
|
|
if not INCOMING.exists():
|
|
return []
|
|
out = []
|
|
for p in sorted(INCOMING.glob("*.pdf")):
|
|
if "יומון" not in p.name:
|
|
print(f"⊘ skip (not a single yomon): {p.name}", flush=True)
|
|
continue
|
|
out.append(p)
|
|
return out
|
|
|
|
|
|
async def main(argv: list[str]) -> None:
|
|
files = [Path(a) for a in argv] if argv else _discover()
|
|
if not files:
|
|
print(f"No yomon PDFs found in {INCOMING}", flush=True)
|
|
return
|
|
|
|
results = []
|
|
for idx, fp in enumerate(files):
|
|
rec = {"file": fp.name}
|
|
if not fp.exists():
|
|
rec["error"] = "file-missing"
|
|
print(f"✗ {fp.name}: file missing", flush=True)
|
|
results.append(rec)
|
|
continue
|
|
yomon_number, iso_date = _parse_name(fp.name)
|
|
try:
|
|
out = await svc.ingest_digest(
|
|
file_path=fp,
|
|
yomon_number=yomon_number,
|
|
digest_date=iso_date,
|
|
)
|
|
rec.update({
|
|
"status": out.get("status"),
|
|
"digest_id": out.get("digest_id"),
|
|
"yomon_number": out.get("yomon_number"),
|
|
"underlying_citation": out.get("underlying_citation"),
|
|
"linked_case_law_id": out.get("linked_case_law_id"),
|
|
})
|
|
link = "🔗 linked" if out.get("linked_case_law_id") else "⚠ unlinked"
|
|
print(
|
|
f"✓ {fp.name}: {out.get('status')} | yomon={out.get('yomon_number')} | "
|
|
f"{link} | {out.get('underlying_citation')}",
|
|
flush=True,
|
|
)
|
|
# No folder move — the DB (content_hash) is the single source of
|
|
# truth. Re-running re-checks incoming/ and skips already-ingested.
|
|
except Exception as e:
|
|
rec["error"] = f"{type(e).__name__}: {e}"
|
|
print(f"✗ {fp.name}: {e}", flush=True)
|
|
traceback.print_exc()
|
|
results.append(rec)
|
|
|
|
print("\n===SUMMARY===", flush=True)
|
|
for r in results:
|
|
print(r, flush=True)
|
|
linked = sum(1 for r in results if r.get("linked_case_law_id"))
|
|
unlinked = sum(
|
|
1 for r in results
|
|
if r.get("status") in ("completed", "exists") and not r.get("linked_case_law_id")
|
|
)
|
|
print(
|
|
f"\nTotal: {len(results)} | linked: {linked} | unlinked (need precedent upload): {unlinked}",
|
|
flush=True,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main(sys.argv[1:]))
|