33 KiB
FU-1 Unified Ingest Path — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Collapse the two parallel ingest functions (ingest_precedent, ingest_internal_decision) into one canonical pipeline parameterized by an IntakeSpec, closing GAP-01/02/04/05.
Architecture: New module services/ingest.py holds a Template-Method skeleton ingest_document(spec, ...); per-type variation rides on a frozen IntakeSpec config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected create_record). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via spec.create_record.
Tech Stack: Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local .venv at mcp-server/.venv.
Spec: docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md
Run tests with: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v
File Structure
- Create
mcp-server/src/legal_mcp/services/ingest.py— canonical pipeline +IntakeSpec+ shared helpers (_stage_file,_coerce_date,_safe_filename,_embed_pages). - Create
mcp-server/tests/test_unified_ingest.py— offline behavioral tests. - Modify
mcp-server/src/legal_mcp/services/precedent_library.py—ingest_precedentbecomes a thin wrapper building_EXTERNAL_SPEC; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get). - Modify
mcp-server/src/legal_mcp/services/internal_decisions.py—ingest_internal_decisionbecomes a thin wrapper building_INTERNAL_SPEC; delete inline pipeline + moved helpers; keep migrate_, enrich_, search_internal.
Unchanged callers (verify, don't edit): tools/precedent_library.py, tools/internal_decisions.py, web/ HTTP handlers — they call the two public functions whose signatures are preserved.
Task 1: Failing tests for the unified pipeline
Files:
-
Test:
mcp-server/tests/test_unified_ingest.py -
Step 1: Write the failing tests
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
Proves both intake types flow through services.ingest.ingest_document and that
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
queued for BOTH types (GAP-02 regression), enum validation applies to both
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
external citation guard is preserved.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from uuid import uuid4
import pytest
from legal_mcp import config
from legal_mcp.services import db, embeddings, chunker, extractor
from legal_mcp.services import ingest, precedent_library, internal_decisions
def _run(coro):
return asyncio.run(coro)
class _Chunk:
def __init__(self, i):
self.chunk_index = i
self.content = f"chunk-{i}"
self.section_type = "body"
self.page_number = 1
self.role = "child"
self.local_id = f"c{i}"
self.parent_local_id = None
@pytest.fixture()
def patched(monkeypatch, tmp_path):
"""Patch every I/O boundary. Record queue + create calls."""
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
async def _extract_text(path):
return ("full decision text", 2, [0, 100])
def _strip(text):
return text
def _chunk(text, page_offsets=None):
return [_Chunk(0), _Chunk(1)]
async def _embed(texts, input_type="document"):
return [[0.0] * 8 for _ in texts]
async def _store_chunks(cid, dicts):
calls["chunks"].append((cid, len(dicts)))
return len(dicts)
async def _create_external(**kw):
calls["create"].append(("external", kw))
return {"id": uuid4()}
async def _create_internal(**kw):
calls["create"].append(("internal", kw))
return {"id": uuid4()}
async def _req_meta(cid):
calls["metadata"].append(cid)
async def _req_hal(cid):
calls["halacha"].append(cid)
async def _set_status(cid, status):
return None
monkeypatch.setattr(extractor, "extract_text", _extract_text)
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
monkeypatch.setattr(chunker, "chunk_document", _chunk)
monkeypatch.setattr(embeddings, "embed_texts", _embed)
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
monkeypatch.setattr(db, "create_external_case_law", _create_external)
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
# Force flat chunking + multimodal OFF unless a test flips it.
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
return calls
def _make_pdf(tmp_path) -> str:
p = tmp_path / "decision.pdf"
p.write_bytes(b"%PDF-1.4 fake")
return str(p)
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
"""GAP-02 regression: the internal path must queue metadata too."""
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
district="ירושלים", practice_area="betterment_levy",
))
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
assert len(patched["halacha"]) == 1
def test_external_queues_both(patched, tmp_path):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert len(patched["metadata"]) == 1
assert len(patched["halacha"]) == 1
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
seen = []
real = ingest.ingest_document
async def _spy(spec, **kw):
seen.append(spec.source_kind)
return await real(spec, **kw)
monkeypatch.setattr(ingest, "ingest_document", _spy)
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
assert seen == ["internal_committee", "external_upload"]
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
"""GAP-04: internal path must validate enums like the external one."""
with pytest.raises(ValueError, match="practice_area"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
with pytest.raises(ValueError, match="practice_area"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
with pytest.raises(ValueError, match="ערר"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
def test_internal_text_path_works_without_file(patched):
out = _run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
assert out["status"] == "completed"
assert out["case_law_id"]
def test_internal_requires_file_or_text(patched):
with pytest.raises(ValueError, match="file_path or text"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
kind, kw = patched["create"][0]
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
- Step 2: Run tests to verify they fail
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'legal_mcp.services.ingest' (or ImportError).
- Step 3: Commit the red tests
cd ~/legal-ai
git add mcp-server/tests/test_unified_ingest.py
git commit -m "test(ingest): failing tests for unified pipeline (FU-1)"
Task 2: Canonical module ingest.py — IntakeSpec + shared helpers
Files:
-
Create:
mcp-server/src/legal_mcp/services/ingest.py -
Step 1: Write the module header, IntakeSpec, and shared helpers
"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import re
import shutil
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
dest_dir = root / (subdir or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
shutil.copy2(src_path, dest)
return dest
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
- Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
- Step 3: Verify the module imports cleanly
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"
Expected: prints IntakeSpec, no error.
- Step 4: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)"
Task 3: Canonical ingest_document
Files:
-
Modify:
mcp-server/src/legal_mcp/services/ingest.py(appendingest_document) -
Step 1: Append the canonical pipeline function
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
else:
raw_text = (text or "").strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
- Step 2: Verify import
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"
Expected: prints ingest_document.
- Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)"
Note on
create_recordkwargs: the wrappers (Tasks 4-5) buildinputsso the leftover keys after poppingcase_name/decision_date/file_path/textexactly match each DB-create's remaining parameters. Verify against the signatures:create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)andcreate_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...).
Task 4: External spec + rewrite ingest_precedent as wrapper
Files:
-
Modify:
mcp-server/src/legal_mcp/services/precedent_library.py -
Step 1: Replace the top-of-file ingest section with a spec + wrapper
Replace the body of ingest_precedent (lines ~88-317) and remove _stage_file, _coerce_date,
_safe_filename, _embed_precedent_pages, and the _VALID_* constants used only by ingest.
Keep _VALID_PRACTICE_AREAS/_VALID_SOURCE_TYPES values but move them into the spec. Add:
from legal_mcp.services import ingest
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
def _external_validate(inputs: dict) -> None:
citation = (inputs.get("citation") or "").strip()
if not citation:
raise ValueError("citation is required")
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
def _external_staging_subdir(inputs: dict) -> str:
st = inputs.get("source_type") or ""
return st if st in {"court_ruling", "appeals_committee"} else "other"
_EXTERNAL_SPEC = ingest.IntakeSpec(
source_kind="external_upload",
id_field="citation",
staging_root=PRECEDENT_LIBRARY_DIR,
staging_subdir=_external_staging_subdir,
validate=_external_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
derive=lambda inputs: {},
display_name_fallback="citation",
create_record=_create_external_record,
)
async def _create_external_record(**kw) -> dict:
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
return await db.create_external_case_law(
case_number=kw["citation"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
headnote=(kw.get("headnote") or "").strip(),
source_type=kw.get("source_type", ""),
precedent_level=kw.get("precedent_level", ""),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
)
async def ingest_precedent(
*,
file_path: str | Path,
citation: str,
case_name: str = "",
court: str = "",
decision_date=None,
source_type: str = "",
precedent_level: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
is_binding: bool = True,
headnote: str = "",
summary: str = "",
document_id: UUID | None = None,
progress: ingest.ProgressCb | None = None,
) -> dict:
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
inputs = {
"citation": citation, "case_name": case_name, "court": court,
"decision_date": decision_date, "source_type": source_type,
"precedent_level": precedent_level, "practice_area": practice_area,
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
"is_binding": is_binding, "headnote": headnote, "summary": summary,
}
return await ingest.ingest_document(
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
document_id=document_id, progress=progress,
)
Define
_create_external_recordABOVE_EXTERNAL_SPEC(Python resolves the name at dataclass-construction time). Reorder if needed.
- Step 2: Run external-path tests
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v
Expected: test_external_queues_both, test_enum_validation_rejects_bad_practice_area_external,
test_external_citation_guard_still_blocks_arar PASS.
- Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/precedent_library.py
git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)"
Task 5: Internal spec + rewrite ingest_internal_decision as wrapper
Files:
-
Modify:
mcp-server/src/legal_mcp/services/internal_decisions.py -
Step 1: Replace the ingest section with a spec + wrapper
Remove _coerce_date, _safe_filename, and the inline pipeline body of
ingest_internal_decision (lines ~73-220). Keep _VALID_DISTRICTS, _COURT_TO_DISTRICT,
_district_from_court, and all migrate_/enrich_/search_internal functions. Add:
from legal_mcp.services import ingest
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
def _internal_validate(inputs: dict) -> None:
if not (inputs.get("case_number") or "").strip():
raise ValueError("case_number is required")
def _internal_derive(inputs: dict) -> dict:
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
)
return {"district": district, "proceeding_type": proc}
async def _create_internal_record(**kw) -> dict:
return await db.create_internal_committee_decision(
case_number=kw["case_number"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
chair_name=(kw.get("chair_name") or "").strip(),
district=kw.get("district", ""),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
proceeding_type=kw.get("proceeding_type") or "ערר",
)
_INTERNAL_SPEC = ingest.IntakeSpec(
source_kind="internal_committee",
id_field="case_number",
staging_root=INTERNAL_DECISIONS_DIR,
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
validate=_internal_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
derive=_internal_derive,
display_name_fallback="case_number",
create_record=_create_internal_record,
)
async def ingest_internal_decision(
*,
case_number: str,
case_name: str = "",
court: str = "",
decision_date=None,
chair_name: str = "",
district: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
summary: str = "",
is_binding: bool = True,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
queue_halachot: bool = True, # retained for signature compat; pipeline always queues
proceeding_type: str = "",
) -> dict:
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
inputs = {
"case_number": case_number, "case_name": case_name, "court": court,
"decision_date": decision_date, "chair_name": chair_name, "district": district,
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
"proceeding_type": proceeding_type,
}
out = await ingest.ingest_document(
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
document_id=document_id,
)
return {"status": out["status"], "case_law_id": out["case_law_id"],
"chunks": out["chunks"], "halachot_pending": True}
queue_halachot=Falsewas only used bymigrate_from_style_corpus. The canonical pipeline always queues both (per INV-ING3). Confirm with the user during execution that bulk re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop.
- Step 2: Run the full test file
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v
Expected: ALL 9 tests PASS — including test_internal_queues_BOTH_metadata_and_halacha (GAP-02).
- Step 3: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/internal_decisions.py
git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)"
Task 6: Dead-code sweep, smoke import, full suite
Files:
-
Verify:
mcp-server/src/legal_mcp/services/precedent_library.py,internal_decisions.py -
Step 1: Confirm no orphaned references to removed helpers
Run: cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py
Expected: NO matches (all moved to ingest.py). If any remain in code paths other than ingest, leave them; if orphaned, delete.
- Step 2: Smoke-import every affected module + its callers
Run:
cd ~/legal-ai/mcp-server && .venv/bin/python -c "
from legal_mcp.services import ingest, precedent_library, internal_decisions
from legal_mcp.tools import precedent_library as t1, internal_decisions as t2
import inspect
sig_p = inspect.signature(precedent_library.ingest_precedent)
sig_i = inspect.signature(internal_decisions.ingest_internal_decision)
assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters
assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters
print('signatures preserved; imports clean')
"
Expected: prints signatures preserved; imports clean.
- Step 3: Run the entire test suite (no regressions elsewhere)
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q
Expected: all pre-existing tests still pass + the 9 new ones.
- Step 4: Lint the changed files (match repo style)
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"
Expected: clean, or "skip".
- Step 5: Update TaskMaster #59 → done
Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task).
- Step 6: Final commit
cd ~/legal-ai
git add -A mcp-server/
git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)"
Self-Review Notes
- GAP-01 (single path) → Tasks 2-5. GAP-02 (metadata queue) → Task 3 step 1 + test
test_internal_queues_BOTH_metadata_and_halacha. GAP-04 (enum validation) →_validate_enums+ tests. GAP-05 (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5. - Boundary preserved: DB-create functions untouched (routed via
create_record); no migration. - Open execution check:
queue_halachot=Falsesuppression inmigrate_from_style_corpus(Task 5 note) — surface to user, do not silently change bulk-migration behavior. - claude_session rule:
ingest.pyimports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.