diff --git a/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md b/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md new file mode 100644 index 0000000..bf3c3c0 --- /dev/null +++ b/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md @@ -0,0 +1,833 @@ +# FU-1 Unified Ingest Path — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05. + +**Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`. + +**Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`. + +**Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md) + +**Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` + +--- + +## File Structure + +- **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`). +- **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests. +- **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py` — `ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get). +- **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py` — `ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal. + +**Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved. + +--- + +## Task 1: Failing tests for the unified pipeline + +**Files:** +- Test: `mcp-server/tests/test_unified_ingest.py` + +- [ ] **Step 1: Write the failing tests** + +```python +"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched). + +Proves both intake types flow through services.ingest.ingest_document and that +the canonical pipeline is symmetric: BOTH metadata and halacha extraction are +queued for BOTH types (GAP-02 regression), enum validation applies to both +(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the +external citation guard is preserved. +""" +from __future__ import annotations + +import asyncio +from pathlib import Path +from uuid import uuid4 + +import pytest + +from legal_mcp import config +from legal_mcp.services import db, embeddings, chunker, extractor +from legal_mcp.services import ingest, precedent_library, internal_decisions + + +def _run(coro): + return asyncio.run(coro) + + +class _Chunk: + def __init__(self, i): + self.chunk_index = i + self.content = f"chunk-{i}" + self.section_type = "body" + self.page_number = 1 + self.role = "child" + self.local_id = f"c{i}" + self.parent_local_id = None + + +@pytest.fixture() +def patched(monkeypatch, tmp_path): + """Patch every I/O boundary. Record queue + create calls.""" + calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []} + + async def _extract_text(path): + return ("full decision text", 2, [0, 100]) + + def _strip(text): + return text + + def _chunk(text, page_offsets=None): + return [_Chunk(0), _Chunk(1)] + + async def _embed(texts, input_type="document"): + return [[0.0] * 8 for _ in texts] + + async def _store_chunks(cid, dicts): + calls["chunks"].append((cid, len(dicts))) + return len(dicts) + + async def _create_external(**kw): + calls["create"].append(("external", kw)) + return {"id": uuid4()} + + async def _create_internal(**kw): + calls["create"].append(("internal", kw)) + return {"id": uuid4()} + + async def _req_meta(cid): + calls["metadata"].append(cid) + + async def _req_hal(cid): + calls["halacha"].append(cid) + + async def _set_status(cid, status): + return None + + monkeypatch.setattr(extractor, "extract_text", _extract_text) + monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip) + monkeypatch.setattr(chunker, "chunk_document", _chunk) + monkeypatch.setattr(embeddings, "embed_texts", _embed) + monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks) + monkeypatch.setattr(db, "create_external_case_law", _create_external) + monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal) + monkeypatch.setattr(db, "request_metadata_extraction", _req_meta) + monkeypatch.setattr(db, "request_halacha_extraction", _req_hal) + monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status) + monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status) + # Force flat chunking + multimodal OFF unless a test flips it. + monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False) + monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False) + return calls + + +def _make_pdf(tmp_path) -> str: + p = tmp_path / "decision.pdf" + p.write_bytes(b"%PDF-1.4 fake") + return str(p) + + +def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path): + """GAP-02 regression: the internal path must queue metadata too.""" + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="decision text", chair_name="דפנה תמיר", + district="ירושלים", practice_area="betterment_levy", + )) + assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)" + assert len(patched["halacha"]) == 1 + + +def test_external_queues_both(patched, tmp_path): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20", + practice_area="rishuy_uvniya", source_type="court_ruling", + )) + assert len(patched["metadata"]) == 1 + assert len(patched["halacha"]) == 1 + + +def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch): + seen = [] + real = ingest.ingest_document + + async def _spy(spec, **kw): + seen.append(spec.source_kind) + return await real(spec, **kw) + + monkeypatch.setattr(ingest, "ingest_document", _spy) + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy")) + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya")) + assert seen == ["internal_committee", "external_upload"] + + +def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path): + """GAP-04: internal path must validate enums like the external one.""" + with pytest.raises(ValueError, match="practice_area"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="bogus")) + + +def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path): + with pytest.raises(ValueError, match="practice_area"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus")) + + +def test_external_citation_guard_still_blocks_arar(patched, tmp_path): + with pytest.raises(ValueError, match="ערר"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="ערר 1234/24")) + + +def test_internal_text_path_works_without_file(patched): + out = _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + assert out["status"] == "completed" + assert out["case_law_id"] + + +def test_internal_requires_file_or_text(patched): + with pytest.raises(ValueError, match="file_path or text"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", chair_name="x", practice_area="betterment_levy")) + + +def test_display_name_fallback_uses_canonical_id(patched, tmp_path): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + kind, kw = patched["create"][0] + assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError). + +- [ ] **Step 3: Commit the red tests** + +```bash +cd ~/legal-ai +git add mcp-server/tests/test_unified_ingest.py +git commit -m "test(ingest): failing tests for unified pipeline (FU-1)" +``` + +--- + +## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers + +**Files:** +- Create: `mcp-server/src/legal_mcp/services/ingest.py` + +- [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers** + +```python +"""Canonical ingest pipeline (FU-1). + +One pipeline for all sibling-entity intake types (external precedent, +internal committee decision). Per-type variation rides on an ``IntakeSpec`` +config object — never a parallel function. See +docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. + +claude_session rule preserved: this module only QUEUES extraction +(``request_*_extraction`` = pure DB writes). It never imports +halacha_extractor / precedent_metadata_extractor, so it is safe to call +from the FastAPI container where the ``claude`` CLI is unavailable. +""" +from __future__ import annotations + +import asyncio +import logging +import re +import shutil +from dataclasses import dataclass +from datetime import date +from pathlib import Path +from typing import Awaitable, Callable +from uuid import UUID, uuid4 + +from legal_mcp import config +from legal_mcp.services import chunker, db, embeddings, extractor + +logger = logging.getLogger(__name__) + +ProgressCb = Callable[[str, int, str], Awaitable[None]] + + +async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: + return None + + +@dataclass(frozen=True) +class IntakeSpec: + """Describes everything that varies between intake types.""" + source_kind: str + id_field: str + staging_root: Path + staging_subdir: Callable[[dict], str] + validate: Callable[[dict], None] + enum_fields: dict[str, frozenset[str]] + derive: Callable[[dict], dict] + display_name_fallback: str + create_record: Callable[..., Awaitable[dict]] + + +def _coerce_date(value) -> date | None: + if value is None or value == "": + return None + if isinstance(value, date): + return value + if isinstance(value, str): + try: + return date.fromisoformat(value[:10]) + except ValueError: + return None + return None + + +def _safe_filename(name: str) -> str: + base = Path(name).name + return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" + + +def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: + dest_dir = root / (subdir or "other") + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" + shutil.copy2(src_path, dest) + return dest + + +def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: + for field_name, allowed in spec.enum_fields.items(): + value = inputs.get(field_name, "") or "" + if value not in allowed: + raise ValueError(f"invalid {field_name}: {value!r}") +``` + +- [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)** + +```python +async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: + """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" + thumb_dir = spec_thumb_dir(case_law_id) + rendered = await asyncio.to_thread( + extractor.render_pages_for_multimodal, + pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, + ) + images = [pil for pil, _ in rendered] + thumbs = [t for _, t in rendered] + img_embs = await embeddings.embed_images(images) + + page_records = [] + for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): + rel_thumb = None + if thumb is not None: + try: + rel_thumb = str(thumb.relative_to(config.DATA_DIR)) + except ValueError: + rel_thumb = str(thumb) + page_records.append({ + "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, + }) + stored = await db.store_precedent_image_embeddings( + case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, + ) + logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) + return {"pages_embedded": stored} + + +def spec_thumb_dir(case_law_id: UUID) -> Path: + """Thumbnails live under the precedent-library tree regardless of intake type.""" + return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) +``` + +- [ ] **Step 3: Verify the module imports cleanly** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"` +Expected: prints `IntakeSpec`, no error. + +- [ ] **Step 4: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/ingest.py +git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)" +``` + +--- + +## Task 3: Canonical `ingest_document` + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`) + +- [ ] **Step 1: Append the canonical pipeline function** + +```python +async def ingest_document( + spec: IntakeSpec, + *, + inputs: dict, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + progress: ProgressCb | None = None, +) -> dict: + """Run the canonical 12-step pipeline for one intake item. + + ``inputs`` carries the type-specific record fields (citation/case_number, + case_name, court, practice_area, etc.). ``spec`` decides how they are + validated, staged, derived, and which DB-create runs. Returns a dict with + at least: status, case_law_id, chunks. + """ + progress = progress or _noop_progress + + # Step 1: input validation (type-specific) + enums (uniform mechanism). + if not file_path and text is None: + raise ValueError("either file_path or text is required") + spec.validate(inputs) + _validate_enums(spec, inputs) + + # Step 2: field derivation (identity for external). + inputs = {**inputs, **spec.derive(inputs)} + + # Steps 3-5: stage (if file) + extract + strip. + page_count = 0 + page_offsets = None + staged: Path | None = None + if file_path: + src = Path(file_path) + if not src.is_file(): + raise FileNotFoundError(f"file not found: {src}") + await progress("staging", 5, "מעתיק את הקובץ לאחסון") + staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + await progress("extracting", 15, "מחלץ טקסט מהקובץ") + try: + raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) + except Exception as e: + await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") + raise + raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip() + else: + raw_text = (text or "").strip() + if not raw_text: + await progress("failed", 100, "לא נמצא טקסט בקובץ") + raise ValueError("no extractable text in file") + + # Step 6: DB create (type-specific, routed — get case_law_id). + await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") + display_name = (inputs.get("case_name") or "").strip() or ( + inputs.get(spec.display_name_fallback) or "" + ).strip() + record = await spec.create_record( + full_text=raw_text, + case_name=display_name, + decision_date=_coerce_date(inputs.get("decision_date")), + document_id=document_id, + **{k: v for k, v in inputs.items() + if k not in {"case_name", "decision_date", "file_path", "text"}}, + ) + case_law_id = UUID(str(record["id"])) + + try: + stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) + + # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. + if (config.MULTIMODAL_ENABLED and page_count > 0 + and staged is not None and staged.suffix.lower() == ".pdf"): + try: + await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") + await _embed_pages(case_law_id, staged, page_count) + except Exception as e: + logger.warning("Multimodal embedding failed (non-fatal): %s", e) + + # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) + + await progress("completed", 100, + f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": stored_chunks, + "halachot": 0, + "halachot_pending": True, + "metadata_filled": [], + "pages": page_count, + } + except Exception as e: + logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) + await db.set_case_law_extraction_status(case_law_id, "failed") + await progress("failed", 100, f"כשל בעיבוד: {e}") + raise + + +async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: + """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" + if config.PARENT_DOC_RETRIEVAL_ENABLED: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") + h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) + if not h_chunks: + return 0 + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") + child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", "local_id": p.local_id, "parent_local_id": None, + "chunk_index": p.chunk_index, "content": p.content, + "section_type": p.section_type, "page_number": p.page_number, "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) + return counts["children"] + else: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") + chunks = chunker.chunk_document(text, page_offsets=page_offsets) + if not chunks: + return 0 + await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") + chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") + chunk_dicts = [ + {"chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v} + for c, v in zip(chunks, chunk_vectors) + ] + return await db.store_precedent_chunks(case_law_id, chunk_dicts) +``` + +- [ ] **Step 2: Verify import** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"` +Expected: prints `ingest_document`. + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/ingest.py +git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)" +``` + +> **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the +> leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match +> each DB-create's remaining parameters. Verify against the signatures: +> `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)` +> and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`. + +--- + +## Task 4: External spec + rewrite `ingest_precedent` as wrapper + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/precedent_library.py` + +- [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper** + +Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`, +`_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest. +Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add: + +```python +from legal_mcp.services import ingest + +PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" + +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) + + +def _external_validate(inputs: dict) -> None: + citation = (inputs.get("citation") or "").strip() + if not citation: + raise ValueError("citation is required") + if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): + raise ValueError( + "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " + "השתמש ב-internal_decision_upload (דורש chair_name + district), " + "לא ב-precedent_library_upload." + ) + + +def _external_staging_subdir(inputs: dict) -> str: + st = inputs.get("source_type") or "" + return st if st in {"court_ruling", "appeals_committee"} else "other" + + +_EXTERNAL_SPEC = ingest.IntakeSpec( + source_kind="external_upload", + id_field="citation", + staging_root=PRECEDENT_LIBRARY_DIR, + staging_subdir=_external_staging_subdir, + validate=_external_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, + derive=lambda inputs: {}, + display_name_fallback="citation", + create_record=_create_external_record, +) + + +async def _create_external_record(**kw) -> dict: + """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" + return await db.create_external_case_law( + case_number=kw["citation"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + headnote=(kw.get("headnote") or "").strip(), + source_type=kw.get("source_type", ""), + precedent_level=kw.get("precedent_level", ""), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + ) + + +async def ingest_precedent( + *, + file_path: str | Path, + citation: str, + case_name: str = "", + court: str = "", + decision_date=None, + source_type: str = "", + precedent_level: str = "", + practice_area: str = "", + appeal_subtype: str = "", + subject_tags: list[str] | None = None, + is_binding: bool = True, + headnote: str = "", + summary: str = "", + document_id: UUID | None = None, + progress: ingest.ProgressCb | None = None, +) -> dict: + """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" + inputs = { + "citation": citation, "case_name": case_name, "court": court, + "decision_date": decision_date, "source_type": source_type, + "precedent_level": precedent_level, "practice_area": practice_area, + "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, + "is_binding": is_binding, "headnote": headnote, "summary": summary, + } + return await ingest.ingest_document( + _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, + document_id=document_id, progress=progress, + ) +``` + +> Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at +> dataclass-construction time). Reorder if needed. + +- [ ] **Step 2: Run external-path tests** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v` +Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`, +`test_external_citation_guard_still_blocks_arar` PASS. + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/precedent_library.py +git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)" +``` + +--- + +## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py` + +- [ ] **Step 1: Replace the ingest section with a spec + wrapper** + +Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of +`ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`, +`_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add: + +```python +from legal_mcp.services import ingest + +INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" + +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) + + +def _internal_validate(inputs: dict) -> None: + if not (inputs.get("case_number") or "").strip(): + raise ValueError("case_number is required") + + +def _internal_derive(inputs: dict) -> dict: + district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") + proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( + appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", + ) + return {"district": district, "proceeding_type": proc} + + +async def _create_internal_record(**kw) -> dict: + return await db.create_internal_committee_decision( + case_number=kw["case_number"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + chair_name=(kw.get("chair_name") or "").strip(), + district=kw.get("district", ""), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + proceeding_type=kw.get("proceeding_type") or "ערר", + ) + + +_INTERNAL_SPEC = ingest.IntakeSpec( + source_kind="internal_committee", + id_field="case_number", + staging_root=INTERNAL_DECISIONS_DIR, + staging_subdir=lambda inputs: (inputs.get("district") or "other"), + validate=_internal_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, + derive=_internal_derive, + display_name_fallback="case_number", + create_record=_create_internal_record, +) + + +async def ingest_internal_decision( + *, + case_number: str, + case_name: str = "", + court: str = "", + decision_date=None, + chair_name: str = "", + district: str = "", + practice_area: str = "", + appeal_subtype: str = "", + subject_tags: list[str] | None = None, + summary: str = "", + is_binding: bool = True, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + queue_halachot: bool = True, # retained for signature compat; pipeline always queues + proceeding_type: str = "", +) -> dict: + """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" + inputs = { + "case_number": case_number, "case_name": case_name, "court": court, + "decision_date": decision_date, "chair_name": chair_name, "district": district, + "practice_area": practice_area, "appeal_subtype": appeal_subtype, + "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, + "proceeding_type": proceeding_type, + } + out = await ingest.ingest_document( + _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, + document_id=document_id, + ) + return {"status": out["status"], "case_law_id": out["case_law_id"], + "chunks": out["chunks"], "halachot_pending": True} +``` + +> `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline +> always queues both (per INV-ING3). Confirm with the user during execution that bulk +> re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this +> wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop. + +- [ ] **Step 2: Run the full test file** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` +Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02). + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/internal_decisions.py +git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)" +``` + +--- + +## Task 6: Dead-code sweep, smoke import, full suite + +**Files:** +- Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py` + +- [ ] **Step 1: Confirm no orphaned references to removed helpers** + +Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py` +Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete. + +- [ ] **Step 2: Smoke-import every affected module + its callers** + +Run: +```bash +cd ~/legal-ai/mcp-server && .venv/bin/python -c " +from legal_mcp.services import ingest, precedent_library, internal_decisions +from legal_mcp.tools import precedent_library as t1, internal_decisions as t2 +import inspect +sig_p = inspect.signature(precedent_library.ingest_precedent) +sig_i = inspect.signature(internal_decisions.ingest_internal_decision) +assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters +assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters +print('signatures preserved; imports clean') +" +``` +Expected: prints `signatures preserved; imports clean`. + +- [ ] **Step 3: Run the entire test suite (no regressions elsewhere)** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q` +Expected: all pre-existing tests still pass + the 9 new ones. + +- [ ] **Step 4: Lint the changed files (match repo style)** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"` +Expected: clean, or "skip". + +- [ ] **Step 5: Update TaskMaster #59 → done** + +Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task). + +- [ ] **Step 6: Final commit** + +```bash +cd ~/legal-ai +git add -A mcp-server/ +git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)" +``` + +--- + +## Self-Review Notes + +- **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5. +- **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration. +- **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior. +- **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.