"""Canonical ingest pipeline (FU-1). One pipeline for all sibling-entity intake types (external precedent, internal committee decision). Per-type variation rides on an ``IntakeSpec`` config object — never a parallel function. See docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. claude_session rule preserved: this module only QUEUES extraction (``request_*_extraction`` = pure DB writes). It never imports halacha_extractor / precedent_metadata_extractor, so it is safe to call from the FastAPI container where the ``claude`` CLI is unavailable. """ from __future__ import annotations import asyncio import logging import mimetypes import re from dataclasses import dataclass from datetime import date from pathlib import Path from typing import Awaitable, Callable from uuid import UUID, uuid4 from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor, storage logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None @dataclass(frozen=True) class IntakeSpec: """Describes everything that varies between intake types.""" source_kind: str id_field: str staging_root: Path staging_subdir: Callable[[dict], str] validate: Callable[[dict], None] enum_fields: dict[str, frozenset[str]] derive: Callable[[dict], dict] display_name_fallback: str create_record: Callable[..., Awaitable[dict]] def _coerce_date(value) -> date | None: if value is None or value == "": return None if isinstance(value, date): return value if isinstance(value, str): try: return date.fromisoformat(value[:10]) except ValueError: return None return None def _safe_filename(name: str) -> str: base = Path(name).name return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: """Stage an intake file through the unified storage layer (INV-STG1). Returns the DATA_DIR path the rest of the pipeline reads from — under the filesystem/dual backends the bytes are on disk and the key is the DATA_DIR-relative path. The Hebrew original filename rides as object metadata, never as the key (INV-STG2).""" dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" key = dest.relative_to(config.DATA_DIR).as_posix() await storage.put_file( src_path, key, bucket=storage.Bucket.DOCUMENTS, content_type=mimetypes.guess_type(src_path.name)[0], metadata={"filename": src_path.name}, ) return dest def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: for field_name, allowed in spec.enum_fields.items(): value = inputs.get(field_name, "") or "" if value not in allowed: raise ValueError(f"invalid {field_name}: {value!r}") async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" thumb_dir = spec_thumb_dir(case_law_id) rendered = await asyncio.to_thread( extractor.render_pages_for_multimodal, pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, ) images = [pil for pil, _ in rendered] thumbs = [t for _, t in rendered] img_embs = await embeddings.embed_images(images) page_records = [] for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): rel_thumb = None if thumb is not None: try: rel_thumb = str(thumb.relative_to(config.DATA_DIR)) except ValueError: rel_thumb = str(thumb) page_records.append({ "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, }) stored = await db.store_precedent_image_embeddings( case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, ) logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) return {"pages_embedded": stored} def spec_thumb_dir(case_law_id: UUID) -> Path: """Thumbnails live under the precedent-library tree regardless of intake type.""" return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) async def ingest_document( spec: IntakeSpec, *, inputs: dict, file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, progress: ProgressCb | None = None, ) -> dict: """Run the canonical 12-step pipeline for one intake item. ``inputs`` carries the type-specific record fields (citation/case_number, case_name, court, practice_area, etc.). ``spec`` decides how they are validated, staged, derived, and which DB-create runs. Returns a dict with at least: status, case_law_id, chunks. """ progress = progress or _noop_progress # Step 1: input validation (type-specific) + enums (uniform mechanism). if not file_path and text is None: raise ValueError("either file_path or text is required") spec.validate(inputs) _validate_enums(spec, inputs) # Step 2: field derivation (identity for external). inputs = {**inputs, **spec.derive(inputs)} # Steps 3-5: stage (if file) + extract + strip. page_count = 0 page_offsets = None staged: Path | None = None if file_path: src = Path(file_path) if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") await progress("staging", 5, "מעתיק את הקובץ לאחסון") staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) except Exception as e: await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") raise raw_text = (raw_text or "") else: raw_text = (text or "") # Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping # it out — it is a free professional gold-set for benchmarking halacha # extraction (#86.3). Stored on the case_law row below once we have its id. nevo_ratio = extractor.extract_nevo_ratio(raw_text) raw_text = extractor.strip_nevo_preamble(raw_text).strip() if not raw_text: await progress("failed", 100, "לא נמצא טקסט בקובץ") raise ValueError("no extractable text in file") # Step 6: DB create (type-specific, routed — get case_law_id). await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") display_name = (inputs.get("case_name") or "").strip() or ( inputs.get(spec.display_name_fallback) or "" ).strip() record = await spec.create_record( full_text=raw_text, case_name=display_name, decision_date=_coerce_date(inputs.get("decision_date")), document_id=document_id, **{k: v for k, v in inputs.items() if k not in {"case_name", "decision_date", "file_path", "text"}}, ) case_law_id = UUID(str(record["id"])) # Persist the captured mini-ratio (best-effort; never block ingest on it). if nevo_ratio: try: await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio) except Exception as e: # noqa: BLE001 — additive metadata, non-fatal logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e) try: stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) await db.mark_indexed(case_law_id) # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. if (config.MULTIMODAL_ENABLED and page_count > 0 and staged is not None and staged.suffix.lower() == ".pdf"): try: await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") await _embed_pages(case_law_id, staged, page_count) except Exception as e: logger.warning("Multimodal embedding failed (non-fatal): %s", e) # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "pending") await db.request_metadata_extraction(case_law_id) await db.request_halacha_extraction(case_law_id) await db.recompute_searchable(case_law_id) await progress("completed", 100, f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") return { "status": "completed", "case_law_id": str(case_law_id), "chunks": stored_chunks, "halachot": 0, "halachot_pending": True, "metadata_filled": [], "pages": page_count, } except Exception as e: logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) await db.set_case_law_extraction_status(case_law_id, "failed") await progress("failed", 100, f"כשל בעיבוד: {e}") raise async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" if config.PARENT_DOC_RETRIEVAL_ENABLED: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) if not h_chunks: return 0 children = [c for c in h_chunks if c.role == "child"] parents = [c for c in h_chunks if c.role == "parent"] await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") chunk_dicts: list[dict] = [] for p in parents: chunk_dicts.append({ "role": "parent", "local_id": p.local_id, "parent_local_id": None, "chunk_index": p.chunk_index, "content": p.content, "section_type": p.section_type, "page_number": p.page_number, "embedding": None, }) for c, v in zip(children, child_vectors): chunk_dicts.append({ "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, "chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v, }) counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) return counts["children"] else: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") chunks = chunker.chunk_document(text, page_offsets=page_offsets) if not chunks: return 0 await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") chunk_dicts = [ {"chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v} for c, v in zip(chunks, chunk_vectors) ] return await db.store_precedent_chunks(case_law_id, chunk_dicts) async def reindex_case_law( case_law_id: "UUID | str", progress: ProgressCb | None = None, ) -> dict: """Re-chunk + re-embed an existing case_law row from its STORED full_text (GAP-09). No re-extract / no re-OCR (uses the stored text — see feedback_no_reocr_retrofit) and no LLM/CLI (only chunker + voyage embeddings), so it is safe to run anywhere. Idempotent: store_precedent_chunks(_hierarchical) is DELETE-then-INSERT. """ progress = progress or _noop_progress cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id)) row = await db.get_case_law(cid) if not row: raise ValueError(f"case_law not found: {cid}") text = (row.get("full_text") or "").strip() if not text: raise ValueError("case_law has no stored full_text to re-index") stored = await _chunk_embed_store(cid, text, None, 0, progress) await db.mark_indexed(cid) await progress("completed", 100, f"הוטמע מחדש: {stored} chunks") return {"status": "completed", "case_law_id": str(cid), "chunks": stored, "reindexed": True}