"""Orchestrator for the Digests radar (X12). A digest ("כל יום" daily one-pager) is a SECONDARY source that POINTS at a ruling — it is never cited in a decision (INV-DIG1) and never enters the precedent/halacha pipeline (INV-DIG2). Ingest reuses only ATOMIC services (extract_text, embeddings), NOT the canonical ``ingest.ingest_document``. Two intake paths share one enrichment core: - ``ingest_digest`` (local/MCP, e.g. batch script) — does everything synchronously: stage → extract_text → create → LLM enrich → embed → autolink → completed. - ``create_pending_digest`` (CONTAINER-SAFE — the web upload) — stage → extract_text → create row with status='pending'. No LLM, no embedding. ``process_pending_digests`` (local/MCP) drains the queue and enriches. claude_session rule: ``digest_metadata_extractor`` (local CLI) is imported LAZILY inside the enrichment core only, so this module stays import-safe from the FastAPI container for create_pending / search / list / link / delete (DB + voyage only — voyage embedding only runs in the local enrich path). """ from __future__ import annotations import logging from datetime import date from pathlib import Path from typing import Awaitable, Callable from uuid import UUID from legal_mcp import config from legal_mcp.services import db, embeddings, extractor, ingest logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] DIGEST_LIBRARY_DIR = Path(config.DATA_DIR) / "digests" _VALID_PRACTICE_AREAS = frozenset( {"", "rishuy_uvniya", "betterment_levy", "compensation_197"} ) async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None def _coerce_date(v) -> date | None: if v is None or v == "": return None if isinstance(v, date): return v if isinstance(v, str): try: return date.fromisoformat(v[:10]) except ValueError: return None return None def _embedding_text(row: dict) -> str: """The single vector indexes the digest as an atomic discovery unit.""" parts = [ row.get("concept_tag", ""), row.get("headline_holding", ""), row.get("summary", ""), row.get("analysis_text", ""), ] return "\n".join(p for p in parts if p).strip() async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str | None: """Best-effort link of a digest to the underlying ruling in case_law (INV-DIG3). Returns the case_law_id (str) if linked, else None. Never raises.""" citation = (underlying_citation or "").strip() if not citation: return None try: match = await db.find_case_law_by_citation_fuzzy(citation) except Exception as e: logger.warning("digest try_autolink lookup failed for %r: %s", citation, e) return None if not match: # Gap (INV-DIG3): the underlying ruling isn't in the corpus. If it's a # court verdict (not ועדת-ערר), enqueue an X13 auto-fetch job so the gap # is actionable instead of silently dropped (INV-CF2). Never raises. await _enqueue_court_fetch(digest_id, citation) return None await db.link_digest_to_case_law(digest_id, match["id"]) return str(match["id"]) async def _enqueue_court_fetch(digest_id: UUID | str, citation: str) -> None: """Queue an X13 court-verdict fetch for an unlinked digest citation. Court rulings (supreme/admin) → a ``court_fetch_jobs`` row drained later by ``court_fetch_drain``. ועדת-ערר (skip) is left alone — it needs Nevo and is surfaced through the normal missing-precedent path, not auto-fetch. """ try: from legal_mcp.services import court_citation cit = court_citation.classify(citation) if cit.tier not in ("supreme", "admin"): return await db.court_fetch_job_upsert( case_number_norm=cit.case_number_norm, citation_raw=citation, tier=cit.tier, court=cit.court_prefix, digest_id=UUID(str(digest_id)), ) logger.info("digest %s: enqueued court-fetch for %r (tier=%s)", digest_id, citation, cit.tier) except Exception as e: # never break digest ingest logger.warning("digest court-fetch enqueue failed for %r: %s", citation, e) # ── Container-safe creation (web upload) — no LLM, no embedding ────── async def create_pending_digest( *, file_path: str | Path, yomon_number: str = "", digest_date: date | str | None = None, practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, progress: ProgressCb | None = None, ) -> dict: """Stage the file, extract text (PyMuPDF — container-safe), and create a digest row with extraction_status='pending'. The LLM metadata extraction, embedding, and autolink are deferred to ``process_pending_digests`` (local). Returns {status, digest_id, extraction_status} or {status:'exists', ...}. Idempotent on content_hash (INV-G3). """ progress = progress or _noop_progress if practice_area and practice_area not in _VALID_PRACTICE_AREAS: raise ValueError(f"invalid practice_area: {practice_area!r}") src = Path(file_path) if not src.exists(): raise ValueError(f"file not found: {file_path}") await progress("staging", 10, "מעתיק קובץ") staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") rel_path = str(staged.relative_to(config.DATA_DIR)) \ if str(staged).startswith(str(config.DATA_DIR)) else str(staged) await progress("extracting_text", 50, "מחלץ טקסט") raw_text, _pc, _off = await extractor.extract_text(str(staged)) raw_text = (raw_text or "").strip() if not raw_text: raise ValueError("no text extracted from digest") content_hash = db._content_hash(raw_text) existing = await db.get_digest_by_content_hash(content_hash) if existing: await progress("completed", 100, "יומון זהה כבר קיים") return {"status": "exists", "digest_id": existing["id"], "extraction_status": existing.get("extraction_status")} record = await db.create_digest( analysis_text=raw_text, yomon_number=yomon_number.strip(), digest_date=_coerce_date(digest_date), practice_area=practice_area, appeal_subtype=appeal_subtype.strip(), subject_tags=list(subject_tags) if subject_tags else [], source_document_path=rel_path, extraction_status="pending", ) await progress("queued", 100, "ממתין לעיבוד מקומי (LLM)") return {"status": "pending", "digest_id": record["id"], "extraction_status": "pending"} # ── Local enrichment core (LLM + embed + autolink) ────────────────── async def enrich_digest(digest_id: UUID | str, progress: ProgressCb | None = None) -> dict: """Run LLM metadata extraction over a digest's analysis_text, fill ONLY empty fields (preserve user-supplied values), embed, autolink, complete. **MCP-tool-only path** (uses the local LLM extractor). Idempotent. """ progress = progress or _noop_progress row = await db.get_digest(digest_id) if not row: raise ValueError("digest not found") analysis = (row.get("analysis_text") or "").strip() if not analysis: await db.update_digest(digest_id, extraction_status="failed") return {"status": "no_text", "digest_id": str(digest_id)} await db.update_digest(digest_id, extraction_status="processing") await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (LLM)") from legal_mcp.services import digest_metadata_extractor extracted = await digest_metadata_extractor.extract(analysis) # Fill only empty fields (preserve user-supplied values from the form). fields: dict = {} for key in ("yomon_number", "concept_tag", "headline_holding", "summary", "underlying_citation", "underlying_court", "underlying_judge", "practice_area", "appeal_subtype"): if not (row.get(key) or "").strip() and extracted.get(key): fields[key] = extracted[key] if row.get("digest_date") is None and extracted.get("digest_date"): fields["digest_date"] = extracted["digest_date"] if row.get("underlying_date") is None and extracted.get("underlying_date"): fields["underlying_date"] = extracted["underlying_date"] if not (row.get("subject_tags") or []) and extracted.get("subject_tags"): fields["subject_tags"] = extracted["subject_tags"] if fields: await db.update_digest(digest_id, **fields) merged = await db.get_digest(digest_id) await progress("embedding", 75, "מחשב embedding") emb_text = _embedding_text(merged) if emb_text: try: vecs = await embeddings.embed_texts([emb_text], input_type="document") if vecs: await db.store_digest_embedding(digest_id, vecs[0]) except Exception as e: # surfaced, not swallowed (§6) logger.warning("digest embedding failed for %s: %s", digest_id, e) await progress("linking", 90, "מנסה לקשר לפסק המקורי") linked_id = None if not merged.get("linked_case_law_id"): linked_id = await try_autolink(digest_id, merged.get("underlying_citation", "")) await db.update_digest(digest_id, extraction_status="completed") await progress("completed", 100, "הושלם") return { "status": "completed", "digest_id": str(digest_id), "yomon_number": merged.get("yomon_number", ""), "underlying_citation": merged.get("underlying_citation", ""), "linked_case_law_id": merged.get("linked_case_law_id") or linked_id, "fields_filled": sorted(fields.keys()), } async def process_pending_digests(limit: int = 20) -> dict: """Drain the digest extraction queue (rows stamped extraction_status='pending' by the web upload). Local/MCP only — runs the LLM enrichment per row. Sequential (avoids LLM rate-limit storms), mirrors process_pending_extractions.""" pending = await db.list_pending_digests(limit=limit) if not pending: return {"status": "no_pending", "processed": 0, "results": []} results = [] processed = 0 for row in pending: did = row["id"] try: res = await enrich_digest(did) processed += 1 results.append({"digest_id": str(did), "status": res.get("status"), "linked": bool(res.get("linked_case_law_id"))}) except Exception as e: logger.exception("process_pending_digests failed for %s: %s", did, e) try: await db.update_digest(did, extraction_status="failed") except Exception: logger.exception("could not mark digest %s failed", did) results.append({"digest_id": str(did), "status": "failed", "error": str(e)}) return {"status": "completed", "processed": processed, "total_pending": len(pending), "results": results} # ── Full synchronous ingest (local/MCP, e.g. batch script) ────────── async def ingest_digest( *, file_path: str | Path, yomon_number: str = "", digest_date: date | str | None = None, practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, progress: ProgressCb | None = None, ) -> dict: """Ingest one digest synchronously. **MCP-tool-only** (uses the LLM). Creates the row (with any user-supplied values) then enriches in place. Idempotent on content_hash (INV-G3). """ progress = progress or _noop_progress created = await create_pending_digest( file_path=file_path, yomon_number=yomon_number, digest_date=digest_date, practice_area=practice_area, appeal_subtype=appeal_subtype, subject_tags=subject_tags, progress=progress, ) if created.get("status") == "exists": return created digest_id = created["digest_id"] enriched = await enrich_digest(digest_id, progress=progress) return enriched # ── Linking (INV-DIG3) ────────────────────────────────────────────── async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict: """Manually link a digest to an underlying ruling (INV-DIG3). Idempotent.""" digest = await db.get_digest(digest_id) if not digest: raise ValueError("digest not found") ruling = await db.get_case_law( case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id)) ) if not ruling: raise ValueError("case_law not found") updated = await db.link_digest_to_case_law(digest_id, case_law_id) return { "linked": True, "digest_id": str(digest_id), "case_law_id": str(case_law_id), "case_number": ruling.get("case_number"), "digest": updated, } async def relink_digest(digest_id: UUID | str) -> dict: """Re-run autolink for an unlinked digest. No-op if already linked / no match.""" digest = await db.get_digest(digest_id) if not digest: raise ValueError("digest not found") if digest.get("linked_case_law_id"): return {"linked": True, "digest_id": str(digest_id), "case_law_id": digest["linked_case_law_id"], "changed": False} linked_id = await try_autolink(digest_id, digest.get("underlying_citation", "")) return { "linked": linked_id is not None, "digest_id": str(digest_id), "case_law_id": linked_id, "changed": linked_id is not None, } async def unlink_digest(digest_id: UUID | str) -> dict: """Clear a digest's link to the underlying ruling.""" updated = await db.link_digest_to_case_law(digest_id, None) if updated is None: raise ValueError("digest not found") return {"unlinked": True, "digest_id": str(digest_id)} # ── Read / search (container-safe: DB + voyage) ───────────────────── async def search_digests( query: str, practice_area: str = "", subject_tag: str = "", concept_tag: str = "", limit: int = 10, ) -> list[dict]: """Semantic search over the digests radar. Container-safe (voyage + DB).""" if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await db.search_digests_semantic( query_embedding=query_vec, practice_area=practice_area, subject_tag=subject_tag, concept_tag=concept_tag, limit=limit, ) async def get_digest(digest_id: UUID | str) -> dict | None: return await db.get_digest(digest_id) async def list_digests( practice_area: str = "", concept_tag: str = "", linked: bool | None = None, search: str = "", limit: int = 100, offset: int = 0, ) -> list[dict]: return await db.list_digests( practice_area=practice_area, concept_tag=concept_tag, linked=linked, search=search, limit=limit, offset=offset, ) async def update_digest(digest_id: UUID | str, **fields) -> dict | None: return await db.update_digest(digest_id, **fields) async def delete_digest(digest_id: UUID | str) -> bool: return await db.delete_digest(digest_id)