834 lines
33 KiB
Markdown
834 lines
33 KiB
Markdown
# FU-1 Unified Ingest Path — Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05.
|
|
|
|
**Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`.
|
|
|
|
**Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`.
|
|
|
|
**Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md)
|
|
|
|
**Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
- **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`).
|
|
- **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests.
|
|
- **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py` — `ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get).
|
|
- **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py` — `ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal.
|
|
|
|
**Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved.
|
|
|
|
---
|
|
|
|
## Task 1: Failing tests for the unified pipeline
|
|
|
|
**Files:**
|
|
- Test: `mcp-server/tests/test_unified_ingest.py`
|
|
|
|
- [ ] **Step 1: Write the failing tests**
|
|
|
|
```python
|
|
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
|
|
|
|
Proves both intake types flow through services.ingest.ingest_document and that
|
|
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
|
|
queued for BOTH types (GAP-02 regression), enum validation applies to both
|
|
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
|
|
external citation guard is preserved.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import db, embeddings, chunker, extractor
|
|
from legal_mcp.services import ingest, precedent_library, internal_decisions
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
class _Chunk:
|
|
def __init__(self, i):
|
|
self.chunk_index = i
|
|
self.content = f"chunk-{i}"
|
|
self.section_type = "body"
|
|
self.page_number = 1
|
|
self.role = "child"
|
|
self.local_id = f"c{i}"
|
|
self.parent_local_id = None
|
|
|
|
|
|
@pytest.fixture()
|
|
def patched(monkeypatch, tmp_path):
|
|
"""Patch every I/O boundary. Record queue + create calls."""
|
|
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
|
|
|
|
async def _extract_text(path):
|
|
return ("full decision text", 2, [0, 100])
|
|
|
|
def _strip(text):
|
|
return text
|
|
|
|
def _chunk(text, page_offsets=None):
|
|
return [_Chunk(0), _Chunk(1)]
|
|
|
|
async def _embed(texts, input_type="document"):
|
|
return [[0.0] * 8 for _ in texts]
|
|
|
|
async def _store_chunks(cid, dicts):
|
|
calls["chunks"].append((cid, len(dicts)))
|
|
return len(dicts)
|
|
|
|
async def _create_external(**kw):
|
|
calls["create"].append(("external", kw))
|
|
return {"id": uuid4()}
|
|
|
|
async def _create_internal(**kw):
|
|
calls["create"].append(("internal", kw))
|
|
return {"id": uuid4()}
|
|
|
|
async def _req_meta(cid):
|
|
calls["metadata"].append(cid)
|
|
|
|
async def _req_hal(cid):
|
|
calls["halacha"].append(cid)
|
|
|
|
async def _set_status(cid, status):
|
|
return None
|
|
|
|
monkeypatch.setattr(extractor, "extract_text", _extract_text)
|
|
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
|
|
monkeypatch.setattr(chunker, "chunk_document", _chunk)
|
|
monkeypatch.setattr(embeddings, "embed_texts", _embed)
|
|
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
|
|
monkeypatch.setattr(db, "create_external_case_law", _create_external)
|
|
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
|
|
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
|
|
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
|
|
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
|
|
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
|
|
# Force flat chunking + multimodal OFF unless a test flips it.
|
|
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
|
|
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
|
|
return calls
|
|
|
|
|
|
def _make_pdf(tmp_path) -> str:
|
|
p = tmp_path / "decision.pdf"
|
|
p.write_bytes(b"%PDF-1.4 fake")
|
|
return str(p)
|
|
|
|
|
|
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
|
|
"""GAP-02 regression: the internal path must queue metadata too."""
|
|
_run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
|
|
district="ירושלים", practice_area="betterment_levy",
|
|
))
|
|
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
|
|
assert len(patched["halacha"]) == 1
|
|
|
|
|
|
def test_external_queues_both(patched, tmp_path):
|
|
_run(precedent_library.ingest_precedent(
|
|
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
|
|
practice_area="rishuy_uvniya", source_type="court_ruling",
|
|
))
|
|
assert len(patched["metadata"]) == 1
|
|
assert len(patched["halacha"]) == 1
|
|
|
|
|
|
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
|
|
seen = []
|
|
real = ingest.ingest_document
|
|
|
|
async def _spy(spec, **kw):
|
|
seen.append(spec.source_kind)
|
|
return await real(spec, **kw)
|
|
|
|
monkeypatch.setattr(ingest, "ingest_document", _spy)
|
|
_run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
|
|
_run(precedent_library.ingest_precedent(
|
|
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
|
|
assert seen == ["internal_committee", "external_upload"]
|
|
|
|
|
|
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
|
|
"""GAP-04: internal path must validate enums like the external one."""
|
|
with pytest.raises(ValueError, match="practice_area"):
|
|
_run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
|
|
|
|
|
|
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
|
|
with pytest.raises(ValueError, match="practice_area"):
|
|
_run(precedent_library.ingest_precedent(
|
|
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
|
|
|
|
|
|
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
|
|
with pytest.raises(ValueError, match="ערר"):
|
|
_run(precedent_library.ingest_precedent(
|
|
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
|
|
|
|
|
|
def test_internal_text_path_works_without_file(patched):
|
|
out = _run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
|
assert out["status"] == "completed"
|
|
assert out["case_law_id"]
|
|
|
|
|
|
def test_internal_requires_file_or_text(patched):
|
|
with pytest.raises(ValueError, match="file_path or text"):
|
|
_run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
|
|
|
|
|
|
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
|
|
_run(internal_decisions.ingest_internal_decision(
|
|
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
|
kind, kw = patched["create"][0]
|
|
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError).
|
|
|
|
- [ ] **Step 3: Commit the red tests**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add mcp-server/tests/test_unified_ingest.py
|
|
git commit -m "test(ingest): failing tests for unified pipeline (FU-1)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers
|
|
|
|
**Files:**
|
|
- Create: `mcp-server/src/legal_mcp/services/ingest.py`
|
|
|
|
- [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers**
|
|
|
|
```python
|
|
"""Canonical ingest pipeline (FU-1).
|
|
|
|
One pipeline for all sibling-entity intake types (external precedent,
|
|
internal committee decision). Per-type variation rides on an ``IntakeSpec``
|
|
config object — never a parallel function. See
|
|
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
|
|
|
|
claude_session rule preserved: this module only QUEUES extraction
|
|
(``request_*_extraction`` = pure DB writes). It never imports
|
|
halacha_extractor / precedent_metadata_extractor, so it is safe to call
|
|
from the FastAPI container where the ``claude`` CLI is unavailable.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Awaitable, Callable
|
|
from uuid import UUID, uuid4
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import chunker, db, embeddings, extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ProgressCb = Callable[[str, int, str], Awaitable[None]]
|
|
|
|
|
|
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
|
return None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class IntakeSpec:
|
|
"""Describes everything that varies between intake types."""
|
|
source_kind: str
|
|
id_field: str
|
|
staging_root: Path
|
|
staging_subdir: Callable[[dict], str]
|
|
validate: Callable[[dict], None]
|
|
enum_fields: dict[str, frozenset[str]]
|
|
derive: Callable[[dict], dict]
|
|
display_name_fallback: str
|
|
create_record: Callable[..., Awaitable[dict]]
|
|
|
|
|
|
def _coerce_date(value) -> date | None:
|
|
if value is None or value == "":
|
|
return None
|
|
if isinstance(value, date):
|
|
return value
|
|
if isinstance(value, str):
|
|
try:
|
|
return date.fromisoformat(value[:10])
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _safe_filename(name: str) -> str:
|
|
base = Path(name).name
|
|
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
|
|
|
|
|
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
|
dest_dir = root / (subdir or "other")
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
|
shutil.copy2(src_path, dest)
|
|
return dest
|
|
|
|
|
|
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
|
|
for field_name, allowed in spec.enum_fields.items():
|
|
value = inputs.get(field_name, "") or ""
|
|
if value not in allowed:
|
|
raise ValueError(f"invalid {field_name}: {value!r}")
|
|
```
|
|
|
|
- [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)**
|
|
|
|
```python
|
|
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
|
|
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
|
|
thumb_dir = spec_thumb_dir(case_law_id)
|
|
rendered = await asyncio.to_thread(
|
|
extractor.render_pages_for_multimodal,
|
|
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
|
|
)
|
|
images = [pil for pil, _ in rendered]
|
|
thumbs = [t for _, t in rendered]
|
|
img_embs = await embeddings.embed_images(images)
|
|
|
|
page_records = []
|
|
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
|
|
rel_thumb = None
|
|
if thumb is not None:
|
|
try:
|
|
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
|
except ValueError:
|
|
rel_thumb = str(thumb)
|
|
page_records.append({
|
|
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
|
|
})
|
|
stored = await db.store_precedent_image_embeddings(
|
|
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
|
|
)
|
|
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
|
|
return {"pages_embedded": stored}
|
|
|
|
|
|
def spec_thumb_dir(case_law_id: UUID) -> Path:
|
|
"""Thumbnails live under the precedent-library tree regardless of intake type."""
|
|
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
|
|
```
|
|
|
|
- [ ] **Step 3: Verify the module imports cleanly**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"`
|
|
Expected: prints `IntakeSpec`, no error.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add mcp-server/src/legal_mcp/services/ingest.py
|
|
git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: Canonical `ingest_document`
|
|
|
|
**Files:**
|
|
- Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`)
|
|
|
|
- [ ] **Step 1: Append the canonical pipeline function**
|
|
|
|
```python
|
|
async def ingest_document(
|
|
spec: IntakeSpec,
|
|
*,
|
|
inputs: dict,
|
|
file_path: str | Path | None = None,
|
|
text: str | None = None,
|
|
document_id: UUID | None = None,
|
|
progress: ProgressCb | None = None,
|
|
) -> dict:
|
|
"""Run the canonical 12-step pipeline for one intake item.
|
|
|
|
``inputs`` carries the type-specific record fields (citation/case_number,
|
|
case_name, court, practice_area, etc.). ``spec`` decides how they are
|
|
validated, staged, derived, and which DB-create runs. Returns a dict with
|
|
at least: status, case_law_id, chunks.
|
|
"""
|
|
progress = progress or _noop_progress
|
|
|
|
# Step 1: input validation (type-specific) + enums (uniform mechanism).
|
|
if not file_path and text is None:
|
|
raise ValueError("either file_path or text is required")
|
|
spec.validate(inputs)
|
|
_validate_enums(spec, inputs)
|
|
|
|
# Step 2: field derivation (identity for external).
|
|
inputs = {**inputs, **spec.derive(inputs)}
|
|
|
|
# Steps 3-5: stage (if file) + extract + strip.
|
|
page_count = 0
|
|
page_offsets = None
|
|
staged: Path | None = None
|
|
if file_path:
|
|
src = Path(file_path)
|
|
if not src.is_file():
|
|
raise FileNotFoundError(f"file not found: {src}")
|
|
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
|
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
|
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
|
try:
|
|
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
|
except Exception as e:
|
|
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
|
|
raise
|
|
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
|
|
else:
|
|
raw_text = (text or "").strip()
|
|
if not raw_text:
|
|
await progress("failed", 100, "לא נמצא טקסט בקובץ")
|
|
raise ValueError("no extractable text in file")
|
|
|
|
# Step 6: DB create (type-specific, routed — get case_law_id).
|
|
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
|
|
display_name = (inputs.get("case_name") or "").strip() or (
|
|
inputs.get(spec.display_name_fallback) or ""
|
|
).strip()
|
|
record = await spec.create_record(
|
|
full_text=raw_text,
|
|
case_name=display_name,
|
|
decision_date=_coerce_date(inputs.get("decision_date")),
|
|
document_id=document_id,
|
|
**{k: v for k, v in inputs.items()
|
|
if k not in {"case_name", "decision_date", "file_path", "text"}},
|
|
)
|
|
case_law_id = UUID(str(record["id"]))
|
|
|
|
try:
|
|
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
|
|
|
|
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
|
|
if (config.MULTIMODAL_ENABLED and page_count > 0
|
|
and staged is not None and staged.suffix.lower() == ".pdf"):
|
|
try:
|
|
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
|
|
await _embed_pages(case_law_id, staged, page_count)
|
|
except Exception as e:
|
|
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
|
|
|
|
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
|
|
await db.set_case_law_extraction_status(case_law_id, "completed")
|
|
await db.set_case_law_halacha_status(case_law_id, "pending")
|
|
await db.request_metadata_extraction(case_law_id)
|
|
await db.request_halacha_extraction(case_law_id)
|
|
|
|
await progress("completed", 100,
|
|
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
|
|
return {
|
|
"status": "completed",
|
|
"case_law_id": str(case_law_id),
|
|
"chunks": stored_chunks,
|
|
"halachot": 0,
|
|
"halachot_pending": True,
|
|
"metadata_filled": [],
|
|
"pages": page_count,
|
|
}
|
|
except Exception as e:
|
|
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
|
|
await db.set_case_law_extraction_status(case_law_id, "failed")
|
|
await progress("failed", 100, f"כשל בעיבוד: {e}")
|
|
raise
|
|
|
|
|
|
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
|
|
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
|
|
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
|
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
|
|
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
|
|
if not h_chunks:
|
|
return 0
|
|
children = [c for c in h_chunks if c.role == "child"]
|
|
parents = [c for c in h_chunks if c.role == "parent"]
|
|
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
|
|
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
|
|
chunk_dicts: list[dict] = []
|
|
for p in parents:
|
|
chunk_dicts.append({
|
|
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
|
|
"chunk_index": p.chunk_index, "content": p.content,
|
|
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
|
|
})
|
|
for c, v in zip(children, child_vectors):
|
|
chunk_dicts.append({
|
|
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
|
|
"chunk_index": c.chunk_index, "content": c.content,
|
|
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
|
|
})
|
|
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
|
|
return counts["children"]
|
|
else:
|
|
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
|
|
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
|
if not chunks:
|
|
return 0
|
|
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
|
|
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
|
|
chunk_dicts = [
|
|
{"chunk_index": c.chunk_index, "content": c.content,
|
|
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
|
|
for c, v in zip(chunks, chunk_vectors)
|
|
]
|
|
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
|
```
|
|
|
|
- [ ] **Step 2: Verify import**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"`
|
|
Expected: prints `ingest_document`.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add mcp-server/src/legal_mcp/services/ingest.py
|
|
git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)"
|
|
```
|
|
|
|
> **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the
|
|
> leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match
|
|
> each DB-create's remaining parameters. Verify against the signatures:
|
|
> `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)`
|
|
> and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`.
|
|
|
|
---
|
|
|
|
## Task 4: External spec + rewrite `ingest_precedent` as wrapper
|
|
|
|
**Files:**
|
|
- Modify: `mcp-server/src/legal_mcp/services/precedent_library.py`
|
|
|
|
- [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper**
|
|
|
|
Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`,
|
|
`_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest.
|
|
Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add:
|
|
|
|
```python
|
|
from legal_mcp.services import ingest
|
|
|
|
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
|
|
|
|
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
|
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
|
|
|
|
|
|
def _external_validate(inputs: dict) -> None:
|
|
citation = (inputs.get("citation") or "").strip()
|
|
if not citation:
|
|
raise ValueError("citation is required")
|
|
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
|
|
raise ValueError(
|
|
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
|
|
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
|
|
"לא ב-precedent_library_upload."
|
|
)
|
|
|
|
|
|
def _external_staging_subdir(inputs: dict) -> str:
|
|
st = inputs.get("source_type") or ""
|
|
return st if st in {"court_ruling", "appeals_committee"} else "other"
|
|
|
|
|
|
_EXTERNAL_SPEC = ingest.IntakeSpec(
|
|
source_kind="external_upload",
|
|
id_field="citation",
|
|
staging_root=PRECEDENT_LIBRARY_DIR,
|
|
staging_subdir=_external_staging_subdir,
|
|
validate=_external_validate,
|
|
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
|
|
derive=lambda inputs: {},
|
|
display_name_fallback="citation",
|
|
create_record=_create_external_record,
|
|
)
|
|
|
|
|
|
async def _create_external_record(**kw) -> dict:
|
|
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
|
|
return await db.create_external_case_law(
|
|
case_number=kw["citation"].strip(),
|
|
case_name=kw["case_name"],
|
|
full_text=kw["full_text"],
|
|
court=(kw.get("court") or "").strip(),
|
|
decision_date=kw.get("decision_date"),
|
|
practice_area=kw.get("practice_area", ""),
|
|
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
|
subject_tags=list(kw.get("subject_tags") or []),
|
|
summary=(kw.get("summary") or "").strip(),
|
|
headnote=(kw.get("headnote") or "").strip(),
|
|
source_type=kw.get("source_type", ""),
|
|
precedent_level=kw.get("precedent_level", ""),
|
|
is_binding=kw.get("is_binding", True),
|
|
document_id=kw.get("document_id"),
|
|
)
|
|
|
|
|
|
async def ingest_precedent(
|
|
*,
|
|
file_path: str | Path,
|
|
citation: str,
|
|
case_name: str = "",
|
|
court: str = "",
|
|
decision_date=None,
|
|
source_type: str = "",
|
|
precedent_level: str = "",
|
|
practice_area: str = "",
|
|
appeal_subtype: str = "",
|
|
subject_tags: list[str] | None = None,
|
|
is_binding: bool = True,
|
|
headnote: str = "",
|
|
summary: str = "",
|
|
document_id: UUID | None = None,
|
|
progress: ingest.ProgressCb | None = None,
|
|
) -> dict:
|
|
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
|
|
inputs = {
|
|
"citation": citation, "case_name": case_name, "court": court,
|
|
"decision_date": decision_date, "source_type": source_type,
|
|
"precedent_level": precedent_level, "practice_area": practice_area,
|
|
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
|
|
"is_binding": is_binding, "headnote": headnote, "summary": summary,
|
|
}
|
|
return await ingest.ingest_document(
|
|
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
|
|
document_id=document_id, progress=progress,
|
|
)
|
|
```
|
|
|
|
> Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at
|
|
> dataclass-construction time). Reorder if needed.
|
|
|
|
- [ ] **Step 2: Run external-path tests**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v`
|
|
Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`,
|
|
`test_external_citation_guard_still_blocks_arar` PASS.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add mcp-server/src/legal_mcp/services/precedent_library.py
|
|
git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper
|
|
|
|
**Files:**
|
|
- Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py`
|
|
|
|
- [ ] **Step 1: Replace the ingest section with a spec + wrapper**
|
|
|
|
Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of
|
|
`ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`,
|
|
`_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add:
|
|
|
|
```python
|
|
from legal_mcp.services import ingest
|
|
|
|
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
|
|
|
|
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
|
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
|
|
|
|
|
|
def _internal_validate(inputs: dict) -> None:
|
|
if not (inputs.get("case_number") or "").strip():
|
|
raise ValueError("case_number is required")
|
|
|
|
|
|
def _internal_derive(inputs: dict) -> dict:
|
|
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
|
|
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
|
|
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
|
|
)
|
|
return {"district": district, "proceeding_type": proc}
|
|
|
|
|
|
async def _create_internal_record(**kw) -> dict:
|
|
return await db.create_internal_committee_decision(
|
|
case_number=kw["case_number"].strip(),
|
|
case_name=kw["case_name"],
|
|
full_text=kw["full_text"],
|
|
court=(kw.get("court") or "").strip(),
|
|
decision_date=kw.get("decision_date"),
|
|
chair_name=(kw.get("chair_name") or "").strip(),
|
|
district=kw.get("district", ""),
|
|
practice_area=kw.get("practice_area", ""),
|
|
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
|
subject_tags=list(kw.get("subject_tags") or []),
|
|
summary=(kw.get("summary") or "").strip(),
|
|
is_binding=kw.get("is_binding", True),
|
|
document_id=kw.get("document_id"),
|
|
proceeding_type=kw.get("proceeding_type") or "ערר",
|
|
)
|
|
|
|
|
|
_INTERNAL_SPEC = ingest.IntakeSpec(
|
|
source_kind="internal_committee",
|
|
id_field="case_number",
|
|
staging_root=INTERNAL_DECISIONS_DIR,
|
|
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
|
|
validate=_internal_validate,
|
|
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
|
|
derive=_internal_derive,
|
|
display_name_fallback="case_number",
|
|
create_record=_create_internal_record,
|
|
)
|
|
|
|
|
|
async def ingest_internal_decision(
|
|
*,
|
|
case_number: str,
|
|
case_name: str = "",
|
|
court: str = "",
|
|
decision_date=None,
|
|
chair_name: str = "",
|
|
district: str = "",
|
|
practice_area: str = "",
|
|
appeal_subtype: str = "",
|
|
subject_tags: list[str] | None = None,
|
|
summary: str = "",
|
|
is_binding: bool = True,
|
|
file_path: str | Path | None = None,
|
|
text: str | None = None,
|
|
document_id: UUID | None = None,
|
|
queue_halachot: bool = True, # retained for signature compat; pipeline always queues
|
|
proceeding_type: str = "",
|
|
) -> dict:
|
|
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
|
|
inputs = {
|
|
"case_number": case_number, "case_name": case_name, "court": court,
|
|
"decision_date": decision_date, "chair_name": chair_name, "district": district,
|
|
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
|
|
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
|
|
"proceeding_type": proceeding_type,
|
|
}
|
|
out = await ingest.ingest_document(
|
|
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
|
|
document_id=document_id,
|
|
)
|
|
return {"status": out["status"], "case_law_id": out["case_law_id"],
|
|
"chunks": out["chunks"], "halachot_pending": True}
|
|
```
|
|
|
|
> `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline
|
|
> always queues both (per INV-ING3). Confirm with the user during execution that bulk
|
|
> re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this
|
|
> wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop.
|
|
|
|
- [ ] **Step 2: Run the full test file**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
|
Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02).
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add mcp-server/src/legal_mcp/services/internal_decisions.py
|
|
git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: Dead-code sweep, smoke import, full suite
|
|
|
|
**Files:**
|
|
- Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py`
|
|
|
|
- [ ] **Step 1: Confirm no orphaned references to removed helpers**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py`
|
|
Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete.
|
|
|
|
- [ ] **Step 2: Smoke-import every affected module + its callers**
|
|
|
|
Run:
|
|
```bash
|
|
cd ~/legal-ai/mcp-server && .venv/bin/python -c "
|
|
from legal_mcp.services import ingest, precedent_library, internal_decisions
|
|
from legal_mcp.tools import precedent_library as t1, internal_decisions as t2
|
|
import inspect
|
|
sig_p = inspect.signature(precedent_library.ingest_precedent)
|
|
sig_i = inspect.signature(internal_decisions.ingest_internal_decision)
|
|
assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters
|
|
assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters
|
|
print('signatures preserved; imports clean')
|
|
"
|
|
```
|
|
Expected: prints `signatures preserved; imports clean`.
|
|
|
|
- [ ] **Step 3: Run the entire test suite (no regressions elsewhere)**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q`
|
|
Expected: all pre-existing tests still pass + the 9 new ones.
|
|
|
|
- [ ] **Step 4: Lint the changed files (match repo style)**
|
|
|
|
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"`
|
|
Expected: clean, or "skip".
|
|
|
|
- [ ] **Step 5: Update TaskMaster #59 → done**
|
|
|
|
Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task).
|
|
|
|
- [ ] **Step 6: Final commit**
|
|
|
|
```bash
|
|
cd ~/legal-ai
|
|
git add -A mcp-server/
|
|
git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review Notes
|
|
|
|
- **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5.
|
|
- **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration.
|
|
- **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior.
|
|
- **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.
|