Files
legal-ai/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md
2026-05-30 19:05:14 +00:00

33 KiB

FU-1 Unified Ingest Path — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Collapse the two parallel ingest functions (ingest_precedent, ingest_internal_decision) into one canonical pipeline parameterized by an IntakeSpec, closing GAP-01/02/04/05.

Architecture: New module services/ingest.py holds a Template-Method skeleton ingest_document(spec, ...); per-type variation rides on a frozen IntakeSpec config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected create_record). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via spec.create_record.

Tech Stack: Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local .venv at mcp-server/.venv.

Spec: docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md

Run tests with: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v


File Structure

  • Create mcp-server/src/legal_mcp/services/ingest.py — canonical pipeline + IntakeSpec + shared helpers (_stage_file, _coerce_date, _safe_filename, _embed_pages).
  • Create mcp-server/tests/test_unified_ingest.py — offline behavioral tests.
  • Modify mcp-server/src/legal_mcp/services/precedent_library.pyingest_precedent becomes a thin wrapper building _EXTERNAL_SPEC; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get).
  • Modify mcp-server/src/legal_mcp/services/internal_decisions.pyingest_internal_decision becomes a thin wrapper building _INTERNAL_SPEC; delete inline pipeline + moved helpers; keep migrate_, enrich_, search_internal.

Unchanged callers (verify, don't edit): tools/precedent_library.py, tools/internal_decisions.py, web/ HTTP handlers — they call the two public functions whose signatures are preserved.


Task 1: Failing tests for the unified pipeline

Files:

  • Test: mcp-server/tests/test_unified_ingest.py

  • Step 1: Write the failing tests

"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).

Proves both intake types flow through services.ingest.ingest_document and that
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
queued for BOTH types (GAP-02 regression), enum validation applies to both
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
external citation guard is preserved.
"""
from __future__ import annotations

import asyncio
from pathlib import Path
from uuid import uuid4

import pytest

from legal_mcp import config
from legal_mcp.services import db, embeddings, chunker, extractor
from legal_mcp.services import ingest, precedent_library, internal_decisions


def _run(coro):
    return asyncio.run(coro)


class _Chunk:
    def __init__(self, i):
        self.chunk_index = i
        self.content = f"chunk-{i}"
        self.section_type = "body"
        self.page_number = 1
        self.role = "child"
        self.local_id = f"c{i}"
        self.parent_local_id = None


@pytest.fixture()
def patched(monkeypatch, tmp_path):
    """Patch every I/O boundary. Record queue + create calls."""
    calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}

    async def _extract_text(path):
        return ("full decision text", 2, [0, 100])

    def _strip(text):
        return text

    def _chunk(text, page_offsets=None):
        return [_Chunk(0), _Chunk(1)]

    async def _embed(texts, input_type="document"):
        return [[0.0] * 8 for _ in texts]

    async def _store_chunks(cid, dicts):
        calls["chunks"].append((cid, len(dicts)))
        return len(dicts)

    async def _create_external(**kw):
        calls["create"].append(("external", kw))
        return {"id": uuid4()}

    async def _create_internal(**kw):
        calls["create"].append(("internal", kw))
        return {"id": uuid4()}

    async def _req_meta(cid):
        calls["metadata"].append(cid)

    async def _req_hal(cid):
        calls["halacha"].append(cid)

    async def _set_status(cid, status):
        return None

    monkeypatch.setattr(extractor, "extract_text", _extract_text)
    monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
    monkeypatch.setattr(chunker, "chunk_document", _chunk)
    monkeypatch.setattr(embeddings, "embed_texts", _embed)
    monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
    monkeypatch.setattr(db, "create_external_case_law", _create_external)
    monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
    monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
    monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
    monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
    monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
    # Force flat chunking + multimodal OFF unless a test flips it.
    monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
    monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
    return calls


def _make_pdf(tmp_path) -> str:
    p = tmp_path / "decision.pdf"
    p.write_bytes(b"%PDF-1.4 fake")
    return str(p)


def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
    """GAP-02 regression: the internal path must queue metadata too."""
    _run(internal_decisions.ingest_internal_decision(
        case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
        district="ירושלים", practice_area="betterment_levy",
    ))
    assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
    assert len(patched["halacha"]) == 1


def test_external_queues_both(patched, tmp_path):
    _run(precedent_library.ingest_precedent(
        file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
        practice_area="rishuy_uvniya", source_type="court_ruling",
    ))
    assert len(patched["metadata"]) == 1
    assert len(patched["halacha"]) == 1


def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
    seen = []
    real = ingest.ingest_document

    async def _spy(spec, **kw):
        seen.append(spec.source_kind)
        return await real(spec, **kw)

    monkeypatch.setattr(ingest, "ingest_document", _spy)
    _run(internal_decisions.ingest_internal_decision(
        case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
    _run(precedent_library.ingest_precedent(
        file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
    assert seen == ["internal_committee", "external_upload"]


def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
    """GAP-04: internal path must validate enums like the external one."""
    with pytest.raises(ValueError, match="practice_area"):
        _run(internal_decisions.ingest_internal_decision(
            case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))


def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
    with pytest.raises(ValueError, match="practice_area"):
        _run(precedent_library.ingest_precedent(
            file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))


def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
    with pytest.raises(ValueError, match="ערר"):
        _run(precedent_library.ingest_precedent(
            file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))


def test_internal_text_path_works_without_file(patched):
    out = _run(internal_decisions.ingest_internal_decision(
        case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
    assert out["status"] == "completed"
    assert out["case_law_id"]


def test_internal_requires_file_or_text(patched):
    with pytest.raises(ValueError, match="file_path or text"):
        _run(internal_decisions.ingest_internal_decision(
            case_number="8046/24", chair_name="x", practice_area="betterment_levy"))


def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
    _run(internal_decisions.ingest_internal_decision(
        case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
    kind, kw = patched["create"][0]
    assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
  • Step 2: Run tests to verify they fail

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v Expected: FAIL — ModuleNotFoundError: No module named 'legal_mcp.services.ingest' (or ImportError).

  • Step 3: Commit the red tests
cd ~/legal-ai
git add mcp-server/tests/test_unified_ingest.py
git commit -m "test(ingest): failing tests for unified pipeline (FU-1)"

Task 2: Canonical module ingest.py — IntakeSpec + shared helpers

Files:

  • Create: mcp-server/src/legal_mcp/services/ingest.py

  • Step 1: Write the module header, IntakeSpec, and shared helpers

"""Canonical ingest pipeline (FU-1).

One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.

claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations

import asyncio
import logging
import re
import shutil
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4

from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor

logger = logging.getLogger(__name__)

ProgressCb = Callable[[str, int, str], Awaitable[None]]


async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
    return None


@dataclass(frozen=True)
class IntakeSpec:
    """Describes everything that varies between intake types."""
    source_kind: str
    id_field: str
    staging_root: Path
    staging_subdir: Callable[[dict], str]
    validate: Callable[[dict], None]
    enum_fields: dict[str, frozenset[str]]
    derive: Callable[[dict], dict]
    display_name_fallback: str
    create_record: Callable[..., Awaitable[dict]]


def _coerce_date(value) -> date | None:
    if value is None or value == "":
        return None
    if isinstance(value, date):
        return value
    if isinstance(value, str):
        try:
            return date.fromisoformat(value[:10])
        except ValueError:
            return None
    return None


def _safe_filename(name: str) -> str:
    base = Path(name).name
    return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"


def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
    dest_dir = root / (subdir or "other")
    dest_dir.mkdir(parents=True, exist_ok=True)
    dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
    shutil.copy2(src_path, dest)
    return dest


def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
    for field_name, allowed in spec.enum_fields.items():
        value = inputs.get(field_name, "") or ""
        if value not in allowed:
            raise ValueError(f"invalid {field_name}: {value!r}")
  • Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
    """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
    thumb_dir = spec_thumb_dir(case_law_id)
    rendered = await asyncio.to_thread(
        extractor.render_pages_for_multimodal,
        pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
    )
    images = [pil for pil, _ in rendered]
    thumbs = [t for _, t in rendered]
    img_embs = await embeddings.embed_images(images)

    page_records = []
    for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
        rel_thumb = None
        if thumb is not None:
            try:
                rel_thumb = str(thumb.relative_to(config.DATA_DIR))
            except ValueError:
                rel_thumb = str(thumb)
        page_records.append({
            "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
        })
    stored = await db.store_precedent_image_embeddings(
        case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
    )
    logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
    return {"pages_embedded": stored}


def spec_thumb_dir(case_law_id: UUID) -> Path:
    """Thumbnails live under the precedent-library tree regardless of intake type."""
    return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
  • Step 3: Verify the module imports cleanly

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)" Expected: prints IntakeSpec, no error.

  • Step 4: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)"

Task 3: Canonical ingest_document

Files:

  • Modify: mcp-server/src/legal_mcp/services/ingest.py (append ingest_document)

  • Step 1: Append the canonical pipeline function

async def ingest_document(
    spec: IntakeSpec,
    *,
    inputs: dict,
    file_path: str | Path | None = None,
    text: str | None = None,
    document_id: UUID | None = None,
    progress: ProgressCb | None = None,
) -> dict:
    """Run the canonical 12-step pipeline for one intake item.

    ``inputs`` carries the type-specific record fields (citation/case_number,
    case_name, court, practice_area, etc.). ``spec`` decides how they are
    validated, staged, derived, and which DB-create runs. Returns a dict with
    at least: status, case_law_id, chunks.
    """
    progress = progress or _noop_progress

    # Step 1: input validation (type-specific) + enums (uniform mechanism).
    if not file_path and text is None:
        raise ValueError("either file_path or text is required")
    spec.validate(inputs)
    _validate_enums(spec, inputs)

    # Step 2: field derivation (identity for external).
    inputs = {**inputs, **spec.derive(inputs)}

    # Steps 3-5: stage (if file) + extract + strip.
    page_count = 0
    page_offsets = None
    staged: Path | None = None
    if file_path:
        src = Path(file_path)
        if not src.is_file():
            raise FileNotFoundError(f"file not found: {src}")
        await progress("staging", 5, "מעתיק את הקובץ לאחסון")
        staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
        await progress("extracting", 15, "מחלץ טקסט מהקובץ")
        try:
            raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
        except Exception as e:
            await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
            raise
        raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
    else:
        raw_text = (text or "").strip()
    if not raw_text:
        await progress("failed", 100, "לא נמצא טקסט בקובץ")
        raise ValueError("no extractable text in file")

    # Step 6: DB create (type-specific, routed — get case_law_id).
    await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
    display_name = (inputs.get("case_name") or "").strip() or (
        inputs.get(spec.display_name_fallback) or ""
    ).strip()
    record = await spec.create_record(
        full_text=raw_text,
        case_name=display_name,
        decision_date=_coerce_date(inputs.get("decision_date")),
        document_id=document_id,
        **{k: v for k, v in inputs.items()
           if k not in {"case_name", "decision_date", "file_path", "text"}},
    )
    case_law_id = UUID(str(record["id"]))

    try:
        stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)

        # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
        if (config.MULTIMODAL_ENABLED and page_count > 0
                and staged is not None and staged.suffix.lower() == ".pdf"):
            try:
                await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
                await _embed_pages(case_law_id, staged, page_count)
            except Exception as e:
                logger.warning("Multimodal embedding failed (non-fatal): %s", e)

        # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
        await db.set_case_law_extraction_status(case_law_id, "completed")
        await db.set_case_law_halacha_status(case_law_id, "pending")
        await db.request_metadata_extraction(case_law_id)
        await db.request_halacha_extraction(case_law_id)

        await progress("completed", 100,
                       f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
        return {
            "status": "completed",
            "case_law_id": str(case_law_id),
            "chunks": stored_chunks,
            "halachot": 0,
            "halachot_pending": True,
            "metadata_filled": [],
            "pages": page_count,
        }
    except Exception as e:
        logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
        await db.set_case_law_extraction_status(case_law_id, "failed")
        await progress("failed", 100, f"כשל בעיבוד: {e}")
        raise


async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
    """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
    if config.PARENT_DOC_RETRIEVAL_ENABLED:
        await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
        h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
        if not h_chunks:
            return 0
        children = [c for c in h_chunks if c.role == "child"]
        parents = [c for c in h_chunks if c.role == "parent"]
        await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
        child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
        chunk_dicts: list[dict] = []
        for p in parents:
            chunk_dicts.append({
                "role": "parent", "local_id": p.local_id, "parent_local_id": None,
                "chunk_index": p.chunk_index, "content": p.content,
                "section_type": p.section_type, "page_number": p.page_number, "embedding": None,
            })
        for c, v in zip(children, child_vectors):
            chunk_dicts.append({
                "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
                "chunk_index": c.chunk_index, "content": c.content,
                "section_type": c.section_type, "page_number": c.page_number, "embedding": v,
            })
        counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
        return counts["children"]
    else:
        await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
        chunks = chunker.chunk_document(text, page_offsets=page_offsets)
        if not chunks:
            return 0
        await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
        chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
        chunk_dicts = [
            {"chunk_index": c.chunk_index, "content": c.content,
             "section_type": c.section_type, "page_number": c.page_number, "embedding": v}
            for c, v in zip(chunks, chunk_vectors)
        ]
        return await db.store_precedent_chunks(case_law_id, chunk_dicts)
  • Step 2: Verify import

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)" Expected: prints ingest_document.

  • Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)"

Note on create_record kwargs: the wrappers (Tasks 4-5) build inputs so the leftover keys after popping case_name/decision_date/file_path/text exactly match each DB-create's remaining parameters. Verify against the signatures: create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...) and create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...).


Task 4: External spec + rewrite ingest_precedent as wrapper

Files:

  • Modify: mcp-server/src/legal_mcp/services/precedent_library.py

  • Step 1: Replace the top-of-file ingest section with a spec + wrapper

Replace the body of ingest_precedent (lines ~88-317) and remove _stage_file, _coerce_date, _safe_filename, _embed_precedent_pages, and the _VALID_* constants used only by ingest. Keep _VALID_PRACTICE_AREAS/_VALID_SOURCE_TYPES values but move them into the spec. Add:

from legal_mcp.services import ingest

PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"

_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})


def _external_validate(inputs: dict) -> None:
    citation = (inputs.get("citation") or "").strip()
    if not citation:
        raise ValueError("citation is required")
    if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
        raise ValueError(
            "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
            "השתמש ב-internal_decision_upload (דורש chair_name + district), "
            "לא ב-precedent_library_upload."
        )


def _external_staging_subdir(inputs: dict) -> str:
    st = inputs.get("source_type") or ""
    return st if st in {"court_ruling", "appeals_committee"} else "other"


_EXTERNAL_SPEC = ingest.IntakeSpec(
    source_kind="external_upload",
    id_field="citation",
    staging_root=PRECEDENT_LIBRARY_DIR,
    staging_subdir=_external_staging_subdir,
    validate=_external_validate,
    enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
    derive=lambda inputs: {},
    display_name_fallback="citation",
    create_record=_create_external_record,
)


async def _create_external_record(**kw) -> dict:
    """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
    return await db.create_external_case_law(
        case_number=kw["citation"].strip(),
        case_name=kw["case_name"],
        full_text=kw["full_text"],
        court=(kw.get("court") or "").strip(),
        decision_date=kw.get("decision_date"),
        practice_area=kw.get("practice_area", ""),
        appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
        subject_tags=list(kw.get("subject_tags") or []),
        summary=(kw.get("summary") or "").strip(),
        headnote=(kw.get("headnote") or "").strip(),
        source_type=kw.get("source_type", ""),
        precedent_level=kw.get("precedent_level", ""),
        is_binding=kw.get("is_binding", True),
        document_id=kw.get("document_id"),
    )


async def ingest_precedent(
    *,
    file_path: str | Path,
    citation: str,
    case_name: str = "",
    court: str = "",
    decision_date=None,
    source_type: str = "",
    precedent_level: str = "",
    practice_area: str = "",
    appeal_subtype: str = "",
    subject_tags: list[str] | None = None,
    is_binding: bool = True,
    headnote: str = "",
    summary: str = "",
    document_id: UUID | None = None,
    progress: ingest.ProgressCb | None = None,
) -> dict:
    """Ingest one external precedent. Thin wrapper over the canonical pipeline."""
    inputs = {
        "citation": citation, "case_name": case_name, "court": court,
        "decision_date": decision_date, "source_type": source_type,
        "precedent_level": precedent_level, "practice_area": practice_area,
        "appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
        "is_binding": is_binding, "headnote": headnote, "summary": summary,
    }
    return await ingest.ingest_document(
        _EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
        document_id=document_id, progress=progress,
    )

Define _create_external_record ABOVE _EXTERNAL_SPEC (Python resolves the name at dataclass-construction time). Reorder if needed.

  • Step 2: Run external-path tests

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v Expected: test_external_queues_both, test_enum_validation_rejects_bad_practice_area_external, test_external_citation_guard_still_blocks_arar PASS.

  • Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/precedent_library.py
git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)"

Task 5: Internal spec + rewrite ingest_internal_decision as wrapper

Files:

  • Modify: mcp-server/src/legal_mcp/services/internal_decisions.py

  • Step 1: Replace the ingest section with a spec + wrapper

Remove _coerce_date, _safe_filename, and the inline pipeline body of ingest_internal_decision (lines ~73-220). Keep _VALID_DISTRICTS, _COURT_TO_DISTRICT, _district_from_court, and all migrate_/enrich_/search_internal functions. Add:

from legal_mcp.services import ingest

INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"

_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})


def _internal_validate(inputs: dict) -> None:
    if not (inputs.get("case_number") or "").strip():
        raise ValueError("case_number is required")


def _internal_derive(inputs: dict) -> dict:
    district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
    proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
        appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
    )
    return {"district": district, "proceeding_type": proc}


async def _create_internal_record(**kw) -> dict:
    return await db.create_internal_committee_decision(
        case_number=kw["case_number"].strip(),
        case_name=kw["case_name"],
        full_text=kw["full_text"],
        court=(kw.get("court") or "").strip(),
        decision_date=kw.get("decision_date"),
        chair_name=(kw.get("chair_name") or "").strip(),
        district=kw.get("district", ""),
        practice_area=kw.get("practice_area", ""),
        appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
        subject_tags=list(kw.get("subject_tags") or []),
        summary=(kw.get("summary") or "").strip(),
        is_binding=kw.get("is_binding", True),
        document_id=kw.get("document_id"),
        proceeding_type=kw.get("proceeding_type") or "ערר",
    )


_INTERNAL_SPEC = ingest.IntakeSpec(
    source_kind="internal_committee",
    id_field="case_number",
    staging_root=INTERNAL_DECISIONS_DIR,
    staging_subdir=lambda inputs: (inputs.get("district") or "other"),
    validate=_internal_validate,
    enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
    derive=_internal_derive,
    display_name_fallback="case_number",
    create_record=_create_internal_record,
)


async def ingest_internal_decision(
    *,
    case_number: str,
    case_name: str = "",
    court: str = "",
    decision_date=None,
    chair_name: str = "",
    district: str = "",
    practice_area: str = "",
    appeal_subtype: str = "",
    subject_tags: list[str] | None = None,
    summary: str = "",
    is_binding: bool = True,
    file_path: str | Path | None = None,
    text: str | None = None,
    document_id: UUID | None = None,
    queue_halachot: bool = True,  # retained for signature compat; pipeline always queues
    proceeding_type: str = "",
) -> dict:
    """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
    inputs = {
        "case_number": case_number, "case_name": case_name, "court": court,
        "decision_date": decision_date, "chair_name": chair_name, "district": district,
        "practice_area": practice_area, "appeal_subtype": appeal_subtype,
        "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
        "proceeding_type": proceeding_type,
    }
    out = await ingest.ingest_document(
        _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
        document_id=document_id,
    )
    return {"status": out["status"], "case_law_id": out["case_law_id"],
            "chunks": out["chunks"], "halachot_pending": True}

queue_halachot=False was only used by migrate_from_style_corpus. The canonical pipeline always queues both (per INV-ING3). Confirm with the user during execution that bulk re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop.

  • Step 2: Run the full test file

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v Expected: ALL 9 tests PASS — including test_internal_queues_BOTH_metadata_and_halacha (GAP-02).

  • Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/internal_decisions.py
git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)"

Task 6: Dead-code sweep, smoke import, full suite

Files:

  • Verify: mcp-server/src/legal_mcp/services/precedent_library.py, internal_decisions.py

  • Step 1: Confirm no orphaned references to removed helpers

Run: cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py Expected: NO matches (all moved to ingest.py). If any remain in code paths other than ingest, leave them; if orphaned, delete.

  • Step 2: Smoke-import every affected module + its callers

Run:

cd ~/legal-ai/mcp-server && .venv/bin/python -c "
from legal_mcp.services import ingest, precedent_library, internal_decisions
from legal_mcp.tools import precedent_library as t1, internal_decisions as t2
import inspect
sig_p = inspect.signature(precedent_library.ingest_precedent)
sig_i = inspect.signature(internal_decisions.ingest_internal_decision)
assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters
assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters
print('signatures preserved; imports clean')
"

Expected: prints signatures preserved; imports clean.

  • Step 3: Run the entire test suite (no regressions elsewhere)

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q Expected: all pre-existing tests still pass + the 9 new ones.

  • Step 4: Lint the changed files (match repo style)

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip" Expected: clean, or "skip".

  • Step 5: Update TaskMaster #59 → done

Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task).

  • Step 6: Final commit
cd ~/legal-ai
git add -A mcp-server/
git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)"

Self-Review Notes

  • GAP-01 (single path) → Tasks 2-5. GAP-02 (metadata queue) → Task 3 step 1 + test test_internal_queues_BOTH_metadata_and_halacha. GAP-04 (enum validation) → _validate_enums + tests. GAP-05 (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5.
  • Boundary preserved: DB-create functions untouched (routed via create_record); no migration.
  • Open execution check: queue_halachot=False suppression in migrate_from_style_corpus (Task 5 note) — surface to user, do not silently change bulk-migration behavior.
  • claude_session rule: ingest.py imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.