"""Orchestrator for the Digests radar (X12). A digest ("כל יום" daily one-pager) is a SECONDARY source that POINTS at a ruling — it is never cited in a decision (INV-DIG1) and never enters the precedent/halacha pipeline (INV-DIG2). Ingest is therefore a short, standalone path that reuses only ATOMIC services (extract_text, embeddings), NOT the canonical ``ingest.ingest_document`` (which is bound to case_law): file → extract_text → content_hash (idempotent) → LLM metadata extract → create_digest → single embedding (concept+headline+summary+analysis) → try_autolink(underlying_citation → case_law) [INV-DIG3] → extraction_status='completed' claude_session rule: ``digest_metadata_extractor`` (local CLI) is imported LAZILY inside ``ingest_digest`` only, so this module is import-safe from the FastAPI container for the search/list/link/delete paths (DB + voyage only). """ from __future__ import annotations import logging from datetime import date from pathlib import Path from typing import Awaitable, Callable from uuid import UUID from legal_mcp import config from legal_mcp.services import db, embeddings, extractor, ingest logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] DIGEST_LIBRARY_DIR = Path(config.DATA_DIR) / "digests" _VALID_PRACTICE_AREAS = frozenset( {"", "rishuy_uvniya", "betterment_levy", "compensation_197"} ) async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None def _embedding_text(fields: dict) -> str: """The single vector indexes the digest as an atomic discovery unit.""" parts = [ fields.get("concept_tag", ""), fields.get("headline_holding", ""), fields.get("summary", ""), fields.get("analysis_text", ""), ] return "\n".join(p for p in parts if p).strip() async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str | None: """Best-effort link of a digest to the underlying ruling in case_law (INV-DIG3). Returns the case_law_id (str) if linked, else None. Never raises.""" citation = (underlying_citation or "").strip() if not citation: return None try: match = await db.find_case_law_by_citation_fuzzy(citation) except Exception as e: logger.warning("digest try_autolink lookup failed for %r: %s", citation, e) return None if not match: return None await db.link_digest_to_case_law(digest_id, match["id"]) return str(match["id"]) async def ingest_digest( *, file_path: str | Path, yomon_number: str = "", digest_date: date | str | None = None, practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, progress: ProgressCb | None = None, ) -> dict: """Ingest one digest. **MCP-tool-only** (uses the local LLM extractor). User-supplied args win over LLM-extracted values for the same field (the chair typed them deliberately); empty args are filled from the LLM. Idempotent on yomon_number / content_hash (INV-G3). """ progress = progress or _noop_progress if practice_area and practice_area not in _VALID_PRACTICE_AREAS: raise ValueError(f"invalid practice_area: {practice_area!r}") src = Path(file_path) if not src.exists(): raise ValueError(f"file not found: {file_path}") await progress("staging", 5, "מעתיק קובץ") staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming") rel_path = str(staged.relative_to(config.DATA_DIR)) \ if str(staged).startswith(str(config.DATA_DIR)) else str(staged) await progress("extracting_text", 20, "מחלץ טקסט") raw_text, _page_count, _offsets = await extractor.extract_text(str(staged)) raw_text = (raw_text or "").strip() if not raw_text: raise ValueError("no text extracted from digest") # Idempotency: identical text already ingested → return existing row. content_hash = db._content_hash(raw_text) existing = await db.get_digest_by_content_hash(content_hash) if existing: await progress("completed", 100, "יומון זהה כבר קיים — לא נוצר כפל") return { "status": "exists", "digest_id": existing["id"], "yomon_number": existing.get("yomon_number", ""), "linked_case_law_id": existing.get("linked_case_law_id"), } # LLM metadata extraction (lazy import — keeps this module container-safe). await progress("extracting_metadata", 45, "מחלץ מטא-דאטה (LLM)") from legal_mcp.services import digest_metadata_extractor extracted = await digest_metadata_extractor.extract(raw_text) def _coerce_date(v) -> date | None: if v is None or v == "": return None if isinstance(v, date): return v if isinstance(v, str): try: return date.fromisoformat(v[:10]) except ValueError: return None return None # Merge: explicit user args win; otherwise fall back to LLM extraction. fields = { "analysis_text": raw_text, "yomon_number": yomon_number.strip() or extracted.get("yomon_number", ""), "digest_date": _coerce_date(digest_date) or extracted.get("digest_date"), "concept_tag": extracted.get("concept_tag", ""), "headline_holding": extracted.get("headline_holding", ""), "summary": extracted.get("summary", ""), "underlying_citation": extracted.get("underlying_citation", ""), "underlying_court": extracted.get("underlying_court", ""), "underlying_date": extracted.get("underlying_date"), "underlying_judge": extracted.get("underlying_judge", ""), "practice_area": practice_area or extracted.get("practice_area", ""), "appeal_subtype": appeal_subtype.strip() or extracted.get("appeal_subtype", ""), "subject_tags": list(subject_tags) if subject_tags else extracted.get("subject_tags", []), "source_document_path": rel_path, "extraction_status": "processing", } await progress("storing", 70, "שומר רשומה") record = await db.create_digest(**fields) digest_id = record["id"] # Single embedding for the whole digest (atomic discovery unit — X12 §6). await progress("embedding", 85, "מחשב embedding") emb_text = _embedding_text(fields) if emb_text: try: vecs = await embeddings.embed_texts([emb_text], input_type="document") if vecs: await db.store_digest_embedding(digest_id, vecs[0]) except Exception as e: # surfaced, not swallowed (§6) logger.warning("digest embedding failed for %s: %s", digest_id, e) # Bridge to the underlying ruling if it is already in the library (INV-DIG3). await progress("linking", 95, "מנסה לקשר לפסק המקורי") linked_id = await try_autolink(digest_id, fields["underlying_citation"]) await db.update_digest(digest_id, extraction_status="completed") await progress("completed", 100, "הושלם") return { "status": "completed", "digest_id": digest_id, "yomon_number": fields["yomon_number"], "underlying_citation": fields["underlying_citation"], "linked_case_law_id": linked_id, "fields_extracted": sorted(extracted.keys()), } async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict: """Manually link a digest to an underlying ruling (INV-DIG3). Idempotent.""" digest = await db.get_digest(digest_id) if not digest: raise ValueError("digest not found") ruling = await db.get_case_law( case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id)) ) if not ruling: raise ValueError("case_law not found") updated = await db.link_digest_to_case_law(digest_id, case_law_id) return { "linked": True, "digest_id": str(digest_id), "case_law_id": str(case_law_id), "case_number": ruling.get("case_number"), "digest": updated, } async def relink_digest(digest_id: UUID | str) -> dict: """Re-run autolink for a digest whose underlying ruling may now be in the library. No-op if already linked or no match found.""" digest = await db.get_digest(digest_id) if not digest: raise ValueError("digest not found") if digest.get("linked_case_law_id"): return {"linked": True, "digest_id": str(digest_id), "case_law_id": digest["linked_case_law_id"], "changed": False} linked_id = await try_autolink(digest_id, digest.get("underlying_citation", "")) return { "linked": linked_id is not None, "digest_id": str(digest_id), "case_law_id": linked_id, "changed": linked_id is not None, } async def search_digests( query: str, practice_area: str = "", subject_tag: str = "", concept_tag: str = "", limit: int = 10, ) -> list[dict]: """Semantic search over the digests radar. Container-safe (voyage + DB).""" if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await db.search_digests_semantic( query_embedding=query_vec, practice_area=practice_area, subject_tag=subject_tag, concept_tag=concept_tag, limit=limit, ) async def get_digest(digest_id: UUID | str) -> dict | None: return await db.get_digest(digest_id) async def list_digests( practice_area: str = "", concept_tag: str = "", linked: bool | None = None, search: str = "", limit: int = 100, offset: int = 0, ) -> list[dict]: return await db.list_digests( practice_area=practice_area, concept_tag=concept_tag, linked=linked, search=search, limit=limit, offset=offset, ) async def delete_digest(digest_id: UUID | str) -> bool: return await db.delete_digest(digest_id)