diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py new file mode 100644 index 0000000..64c7b82 --- /dev/null +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -0,0 +1,115 @@ +"""Canonical ingest pipeline (FU-1). + +One pipeline for all sibling-entity intake types (external precedent, +internal committee decision). Per-type variation rides on an ``IntakeSpec`` +config object — never a parallel function. See +docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. + +claude_session rule preserved: this module only QUEUES extraction +(``request_*_extraction`` = pure DB writes). It never imports +halacha_extractor / precedent_metadata_extractor, so it is safe to call +from the FastAPI container where the ``claude`` CLI is unavailable. +""" +from __future__ import annotations + +import asyncio +import logging +import re +import shutil +from dataclasses import dataclass +from datetime import date +from pathlib import Path +from typing import Awaitable, Callable +from uuid import UUID, uuid4 + +from legal_mcp import config +from legal_mcp.services import chunker, db, embeddings, extractor + +logger = logging.getLogger(__name__) + +ProgressCb = Callable[[str, int, str], Awaitable[None]] + + +async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: + return None + + +@dataclass(frozen=True) +class IntakeSpec: + """Describes everything that varies between intake types.""" + source_kind: str + id_field: str + staging_root: Path + staging_subdir: Callable[[dict], str] + validate: Callable[[dict], None] + enum_fields: dict[str, frozenset[str]] + derive: Callable[[dict], dict] + display_name_fallback: str + create_record: Callable[..., Awaitable[dict]] + + +def _coerce_date(value) -> date | None: + if value is None or value == "": + return None + if isinstance(value, date): + return value + if isinstance(value, str): + try: + return date.fromisoformat(value[:10]) + except ValueError: + return None + return None + + +def _safe_filename(name: str) -> str: + base = Path(name).name + return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" + + +def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: + dest_dir = root / (subdir or "other") + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" + shutil.copy2(src_path, dest) + return dest + + +def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: + for field_name, allowed in spec.enum_fields.items(): + value = inputs.get(field_name, "") or "" + if value not in allowed: + raise ValueError(f"invalid {field_name}: {value!r}") + + +async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: + """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" + thumb_dir = spec_thumb_dir(case_law_id) + rendered = await asyncio.to_thread( + extractor.render_pages_for_multimodal, + pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, + ) + images = [pil for pil, _ in rendered] + thumbs = [t for _, t in rendered] + img_embs = await embeddings.embed_images(images) + + page_records = [] + for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): + rel_thumb = None + if thumb is not None: + try: + rel_thumb = str(thumb.relative_to(config.DATA_DIR)) + except ValueError: + rel_thumb = str(thumb) + page_records.append({ + "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, + }) + stored = await db.store_precedent_image_embeddings( + case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, + ) + logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) + return {"pages_embedded": stored} + + +def spec_thumb_dir(case_law_id: UUID) -> Path: + """Thumbnails live under the precedent-library tree regardless of intake type.""" + return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)