# FU-1 Unified Ingest Path — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05. **Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`. **Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`. **Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md) **Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` --- ## File Structure - **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`). - **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests. - **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py` — `ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get). - **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py` — `ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal. **Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved. --- ## Task 1: Failing tests for the unified pipeline **Files:** - Test: `mcp-server/tests/test_unified_ingest.py` - [ ] **Step 1: Write the failing tests** ```python """FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched). Proves both intake types flow through services.ingest.ingest_document and that the canonical pipeline is symmetric: BOTH metadata and halacha extraction are queued for BOTH types (GAP-02 regression), enum validation applies to both (GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the external citation guard is preserved. """ from __future__ import annotations import asyncio from pathlib import Path from uuid import uuid4 import pytest from legal_mcp import config from legal_mcp.services import db, embeddings, chunker, extractor from legal_mcp.services import ingest, precedent_library, internal_decisions def _run(coro): return asyncio.run(coro) class _Chunk: def __init__(self, i): self.chunk_index = i self.content = f"chunk-{i}" self.section_type = "body" self.page_number = 1 self.role = "child" self.local_id = f"c{i}" self.parent_local_id = None @pytest.fixture() def patched(monkeypatch, tmp_path): """Patch every I/O boundary. Record queue + create calls.""" calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []} async def _extract_text(path): return ("full decision text", 2, [0, 100]) def _strip(text): return text def _chunk(text, page_offsets=None): return [_Chunk(0), _Chunk(1)] async def _embed(texts, input_type="document"): return [[0.0] * 8 for _ in texts] async def _store_chunks(cid, dicts): calls["chunks"].append((cid, len(dicts))) return len(dicts) async def _create_external(**kw): calls["create"].append(("external", kw)) return {"id": uuid4()} async def _create_internal(**kw): calls["create"].append(("internal", kw)) return {"id": uuid4()} async def _req_meta(cid): calls["metadata"].append(cid) async def _req_hal(cid): calls["halacha"].append(cid) async def _set_status(cid, status): return None monkeypatch.setattr(extractor, "extract_text", _extract_text) monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip) monkeypatch.setattr(chunker, "chunk_document", _chunk) monkeypatch.setattr(embeddings, "embed_texts", _embed) monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks) monkeypatch.setattr(db, "create_external_case_law", _create_external) monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal) monkeypatch.setattr(db, "request_metadata_extraction", _req_meta) monkeypatch.setattr(db, "request_halacha_extraction", _req_hal) monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status) monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status) # Force flat chunking + multimodal OFF unless a test flips it. monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False) monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False) return calls def _make_pdf(tmp_path) -> str: p = tmp_path / "decision.pdf" p.write_bytes(b"%PDF-1.4 fake") return str(p) def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path): """GAP-02 regression: the internal path must queue metadata too.""" _run(internal_decisions.ingest_internal_decision( case_number="8046/24", text="decision text", chair_name="דפנה תמיר", district="ירושלים", practice_area="betterment_levy", )) assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)" assert len(patched["halacha"]) == 1 def test_external_queues_both(patched, tmp_path): _run(precedent_library.ingest_precedent( file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20", practice_area="rishuy_uvniya", source_type="court_ruling", )) assert len(patched["metadata"]) == 1 assert len(patched["halacha"]) == 1 def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch): seen = [] real = ingest.ingest_document async def _spy(spec, **kw): seen.append(spec.source_kind) return await real(spec, **kw) monkeypatch.setattr(ingest, "ingest_document", _spy) _run(internal_decisions.ingest_internal_decision( case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy")) _run(precedent_library.ingest_precedent( file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya")) assert seen == ["internal_committee", "external_upload"] def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path): """GAP-04: internal path must validate enums like the external one.""" with pytest.raises(ValueError, match="practice_area"): _run(internal_decisions.ingest_internal_decision( case_number="8046/24", text="t", chair_name="x", practice_area="bogus")) def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path): with pytest.raises(ValueError, match="practice_area"): _run(precedent_library.ingest_precedent( file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus")) def test_external_citation_guard_still_blocks_arar(patched, tmp_path): with pytest.raises(ValueError, match="ערר"): _run(precedent_library.ingest_precedent( file_path=_make_pdf(tmp_path), citation="ערר 1234/24")) def test_internal_text_path_works_without_file(patched): out = _run(internal_decisions.ingest_internal_decision( case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) assert out["status"] == "completed" assert out["case_law_id"] def test_internal_requires_file_or_text(patched): with pytest.raises(ValueError, match="file_path or text"): _run(internal_decisions.ingest_internal_decision( case_number="8046/24", chair_name="x", practice_area="betterment_levy")) def test_display_name_fallback_uses_canonical_id(patched, tmp_path): _run(internal_decisions.ingest_internal_decision( case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) kind, kw = patched["create"][0] assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError). - [ ] **Step 3: Commit the red tests** ```bash cd ~/legal-ai git add mcp-server/tests/test_unified_ingest.py git commit -m "test(ingest): failing tests for unified pipeline (FU-1)" ``` --- ## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers **Files:** - Create: `mcp-server/src/legal_mcp/services/ingest.py` - [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers** ```python """Canonical ingest pipeline (FU-1). One pipeline for all sibling-entity intake types (external precedent, internal committee decision). Per-type variation rides on an ``IntakeSpec`` config object — never a parallel function. See docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. claude_session rule preserved: this module only QUEUES extraction (``request_*_extraction`` = pure DB writes). It never imports halacha_extractor / precedent_metadata_extractor, so it is safe to call from the FastAPI container where the ``claude`` CLI is unavailable. """ from __future__ import annotations import asyncio import logging import re import shutil from dataclasses import dataclass from datetime import date from pathlib import Path from typing import Awaitable, Callable from uuid import UUID, uuid4 from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None @dataclass(frozen=True) class IntakeSpec: """Describes everything that varies between intake types.""" source_kind: str id_field: str staging_root: Path staging_subdir: Callable[[dict], str] validate: Callable[[dict], None] enum_fields: dict[str, frozenset[str]] derive: Callable[[dict], dict] display_name_fallback: str create_record: Callable[..., Awaitable[dict]] def _coerce_date(value) -> date | None: if value is None or value == "": return None if isinstance(value, date): return value if isinstance(value, str): try: return date.fromisoformat(value[:10]) except ValueError: return None return None def _safe_filename(name: str) -> str: base = Path(name).name return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: dest_dir = root / (subdir or "other") dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" shutil.copy2(src_path, dest) return dest def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: for field_name, allowed in spec.enum_fields.items(): value = inputs.get(field_name, "") or "" if value not in allowed: raise ValueError(f"invalid {field_name}: {value!r}") ``` - [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)** ```python async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" thumb_dir = spec_thumb_dir(case_law_id) rendered = await asyncio.to_thread( extractor.render_pages_for_multimodal, pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, ) images = [pil for pil, _ in rendered] thumbs = [t for _, t in rendered] img_embs = await embeddings.embed_images(images) page_records = [] for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): rel_thumb = None if thumb is not None: try: rel_thumb = str(thumb.relative_to(config.DATA_DIR)) except ValueError: rel_thumb = str(thumb) page_records.append({ "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, }) stored = await db.store_precedent_image_embeddings( case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, ) logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) return {"pages_embedded": stored} def spec_thumb_dir(case_law_id: UUID) -> Path: """Thumbnails live under the precedent-library tree regardless of intake type.""" return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) ``` - [ ] **Step 3: Verify the module imports cleanly** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"` Expected: prints `IntakeSpec`, no error. - [ ] **Step 4: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/ingest.py git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)" ``` --- ## Task 3: Canonical `ingest_document` **Files:** - Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`) - [ ] **Step 1: Append the canonical pipeline function** ```python async def ingest_document( spec: IntakeSpec, *, inputs: dict, file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, progress: ProgressCb | None = None, ) -> dict: """Run the canonical 12-step pipeline for one intake item. ``inputs`` carries the type-specific record fields (citation/case_number, case_name, court, practice_area, etc.). ``spec`` decides how they are validated, staged, derived, and which DB-create runs. Returns a dict with at least: status, case_law_id, chunks. """ progress = progress or _noop_progress # Step 1: input validation (type-specific) + enums (uniform mechanism). if not file_path and text is None: raise ValueError("either file_path or text is required") spec.validate(inputs) _validate_enums(spec, inputs) # Step 2: field derivation (identity for external). inputs = {**inputs, **spec.derive(inputs)} # Steps 3-5: stage (if file) + extract + strip. page_count = 0 page_offsets = None staged: Path | None = None if file_path: src = Path(file_path) if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") await progress("staging", 5, "מעתיק את הקובץ לאחסון") staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) except Exception as e: await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") raise raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip() else: raw_text = (text or "").strip() if not raw_text: await progress("failed", 100, "לא נמצא טקסט בקובץ") raise ValueError("no extractable text in file") # Step 6: DB create (type-specific, routed — get case_law_id). await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") display_name = (inputs.get("case_name") or "").strip() or ( inputs.get(spec.display_name_fallback) or "" ).strip() record = await spec.create_record( full_text=raw_text, case_name=display_name, decision_date=_coerce_date(inputs.get("decision_date")), document_id=document_id, **{k: v for k, v in inputs.items() if k not in {"case_name", "decision_date", "file_path", "text"}}, ) case_law_id = UUID(str(record["id"])) try: stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. if (config.MULTIMODAL_ENABLED and page_count > 0 and staged is not None and staged.suffix.lower() == ".pdf"): try: await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") await _embed_pages(case_law_id, staged, page_count) except Exception as e: logger.warning("Multimodal embedding failed (non-fatal): %s", e) # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "pending") await db.request_metadata_extraction(case_law_id) await db.request_halacha_extraction(case_law_id) await progress("completed", 100, f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") return { "status": "completed", "case_law_id": str(case_law_id), "chunks": stored_chunks, "halachot": 0, "halachot_pending": True, "metadata_filled": [], "pages": page_count, } except Exception as e: logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) await db.set_case_law_extraction_status(case_law_id, "failed") await progress("failed", 100, f"כשל בעיבוד: {e}") raise async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" if config.PARENT_DOC_RETRIEVAL_ENABLED: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) if not h_chunks: return 0 children = [c for c in h_chunks if c.role == "child"] parents = [c for c in h_chunks if c.role == "parent"] await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") chunk_dicts: list[dict] = [] for p in parents: chunk_dicts.append({ "role": "parent", "local_id": p.local_id, "parent_local_id": None, "chunk_index": p.chunk_index, "content": p.content, "section_type": p.section_type, "page_number": p.page_number, "embedding": None, }) for c, v in zip(children, child_vectors): chunk_dicts.append({ "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, "chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v, }) counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) return counts["children"] else: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") chunks = chunker.chunk_document(text, page_offsets=page_offsets) if not chunks: return 0 await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") chunk_dicts = [ {"chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v} for c, v in zip(chunks, chunk_vectors) ] return await db.store_precedent_chunks(case_law_id, chunk_dicts) ``` - [ ] **Step 2: Verify import** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"` Expected: prints `ingest_document`. - [ ] **Step 3: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/ingest.py git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)" ``` > **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the > leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match > each DB-create's remaining parameters. Verify against the signatures: > `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)` > and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`. --- ## Task 4: External spec + rewrite `ingest_precedent` as wrapper **Files:** - Modify: `mcp-server/src/legal_mcp/services/precedent_library.py` - [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper** Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest. Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add: ```python from legal_mcp.services import ingest PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" _VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) _VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) def _external_validate(inputs: dict) -> None: citation = (inputs.get("citation") or "").strip() if not citation: raise ValueError("citation is required") if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): raise ValueError( "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " "השתמש ב-internal_decision_upload (דורש chair_name + district), " "לא ב-precedent_library_upload." ) def _external_staging_subdir(inputs: dict) -> str: st = inputs.get("source_type") or "" return st if st in {"court_ruling", "appeals_committee"} else "other" _EXTERNAL_SPEC = ingest.IntakeSpec( source_kind="external_upload", id_field="citation", staging_root=PRECEDENT_LIBRARY_DIR, staging_subdir=_external_staging_subdir, validate=_external_validate, enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, derive=lambda inputs: {}, display_name_fallback="citation", create_record=_create_external_record, ) async def _create_external_record(**kw) -> dict: """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" return await db.create_external_case_law( case_number=kw["citation"].strip(), case_name=kw["case_name"], full_text=kw["full_text"], court=(kw.get("court") or "").strip(), decision_date=kw.get("decision_date"), practice_area=kw.get("practice_area", ""), appeal_subtype=(kw.get("appeal_subtype") or "").strip(), subject_tags=list(kw.get("subject_tags") or []), summary=(kw.get("summary") or "").strip(), headnote=(kw.get("headnote") or "").strip(), source_type=kw.get("source_type", ""), precedent_level=kw.get("precedent_level", ""), is_binding=kw.get("is_binding", True), document_id=kw.get("document_id"), ) async def ingest_precedent( *, file_path: str | Path, citation: str, case_name: str = "", court: str = "", decision_date=None, source_type: str = "", precedent_level: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, is_binding: bool = True, headnote: str = "", summary: str = "", document_id: UUID | None = None, progress: ingest.ProgressCb | None = None, ) -> dict: """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" inputs = { "citation": citation, "case_name": case_name, "court": court, "decision_date": decision_date, "source_type": source_type, "precedent_level": precedent_level, "practice_area": practice_area, "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, "is_binding": is_binding, "headnote": headnote, "summary": summary, } return await ingest.ingest_document( _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, document_id=document_id, progress=progress, ) ``` > Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at > dataclass-construction time). Reorder if needed. - [ ] **Step 2: Run external-path tests** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v` Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`, `test_external_citation_guard_still_blocks_arar` PASS. - [ ] **Step 3: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/precedent_library.py git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)" ``` --- ## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper **Files:** - Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py` - [ ] **Step 1: Replace the ingest section with a spec + wrapper** Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of `ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`, `_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add: ```python from legal_mcp.services import ingest INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" _VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) _VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) def _internal_validate(inputs: dict) -> None: if not (inputs.get("case_number") or "").strip(): raise ValueError("case_number is required") def _internal_derive(inputs: dict) -> dict: district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", ) return {"district": district, "proceeding_type": proc} async def _create_internal_record(**kw) -> dict: return await db.create_internal_committee_decision( case_number=kw["case_number"].strip(), case_name=kw["case_name"], full_text=kw["full_text"], court=(kw.get("court") or "").strip(), decision_date=kw.get("decision_date"), chair_name=(kw.get("chair_name") or "").strip(), district=kw.get("district", ""), practice_area=kw.get("practice_area", ""), appeal_subtype=(kw.get("appeal_subtype") or "").strip(), subject_tags=list(kw.get("subject_tags") or []), summary=(kw.get("summary") or "").strip(), is_binding=kw.get("is_binding", True), document_id=kw.get("document_id"), proceeding_type=kw.get("proceeding_type") or "ערר", ) _INTERNAL_SPEC = ingest.IntakeSpec( source_kind="internal_committee", id_field="case_number", staging_root=INTERNAL_DECISIONS_DIR, staging_subdir=lambda inputs: (inputs.get("district") or "other"), validate=_internal_validate, enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, derive=_internal_derive, display_name_fallback="case_number", create_record=_create_internal_record, ) async def ingest_internal_decision( *, case_number: str, case_name: str = "", court: str = "", decision_date=None, chair_name: str = "", district: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, summary: str = "", is_binding: bool = True, file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, queue_halachot: bool = True, # retained for signature compat; pipeline always queues proceeding_type: str = "", ) -> dict: """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" inputs = { "case_number": case_number, "case_name": case_name, "court": court, "decision_date": decision_date, "chair_name": chair_name, "district": district, "practice_area": practice_area, "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, "proceeding_type": proceeding_type, } out = await ingest.ingest_document( _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, document_id=document_id, ) return {"status": out["status"], "case_law_id": out["case_law_id"], "chunks": out["chunks"], "halachot_pending": True} ``` > `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline > always queues both (per INV-ING3). Confirm with the user during execution that bulk > re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this > wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop. - [ ] **Step 2: Run the full test file** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02). - [ ] **Step 3: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/internal_decisions.py git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)" ``` --- ## Task 6: Dead-code sweep, smoke import, full suite **Files:** - Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py` - [ ] **Step 1: Confirm no orphaned references to removed helpers** Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py` Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete. - [ ] **Step 2: Smoke-import every affected module + its callers** Run: ```bash cd ~/legal-ai/mcp-server && .venv/bin/python -c " from legal_mcp.services import ingest, precedent_library, internal_decisions from legal_mcp.tools import precedent_library as t1, internal_decisions as t2 import inspect sig_p = inspect.signature(precedent_library.ingest_precedent) sig_i = inspect.signature(internal_decisions.ingest_internal_decision) assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters print('signatures preserved; imports clean') " ``` Expected: prints `signatures preserved; imports clean`. - [ ] **Step 3: Run the entire test suite (no regressions elsewhere)** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q` Expected: all pre-existing tests still pass + the 9 new ones. - [ ] **Step 4: Lint the changed files (match repo style)** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"` Expected: clean, or "skip". - [ ] **Step 5: Update TaskMaster #59 → done** Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task). - [ ] **Step 6: Final commit** ```bash cd ~/legal-ai git add -A mcp-server/ git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)" ``` --- ## Self-Review Notes - **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5. - **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration. - **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior. - **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.