feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-30 19:11:27 +00:00
parent 9ae2d47d03
commit d4663eba8f

View File

@@ -0,0 +1,115 @@
"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import re
import shutil
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
dest_dir = root / (subdir or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
shutil.copy2(src_path, dest)
return dest
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)