Merge pull request 'FU-1: איחוד מסלול-הקליטה למסלול קנוני אחד (GAP-01/02/04/05)' (#11) from fix/fu1-unified-ingest into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m51s

This commit was merged in pull request #11.
This commit is contained in:
2026-05-30 19:37:33 +00:00
7 changed files with 1548 additions and 450 deletions

View File

@@ -2007,7 +2007,7 @@
"description": "מאחד את ingest_precedent ו-ingest_internal_decision למסלול קנוני יחיד; מבטל את האסימטריות.",
"details": "מכסה GAP-01,02,04,05. מספק INV-ING1/ING3/G2/G4. severity: Critical. סוג: קוד. יסוד — FU-2/FU-3/FU-7 תלויים בו. מקור: docs/spec/gap-audit.md + 01-ingest.md.",
"testStrategy": "",
"status": "pending",
"status": "done",
"dependencies": [],
"priority": "high",
"subtasks": [
@@ -2017,7 +2017,7 @@
"description": "ביטול שני המסלולים המקבילים (precedent_library.py:88 vs internal_decisions.py:73); כל סוג = פרמטרים, לא פונקציה נפרדת.",
"dependencies": [],
"details": "INV-ING1/G2",
"status": "pending",
"status": "done",
"testStrategy": "",
"parentId": "59"
},
@@ -2027,7 +2027,7 @@
"description": "קריאה ל-request_metadata_extraction גם במסלול הפנימי (היום רק halacha, internal_decisions.py:208).",
"dependencies": [],
"details": "INV-ING3/DM1",
"status": "pending",
"status": "done",
"testStrategy": "",
"parentId": "59"
},
@@ -2037,7 +2037,7 @@
"description": "הוספת ולידציית practice_area/source_type למסלול הפנימי (כמו precedent_library.py:131-134).",
"dependencies": [],
"details": "INV-G4",
"status": "pending",
"status": "done",
"testStrategy": "",
"parentId": "59"
},
@@ -2047,7 +2047,7 @@
"description": "שאר 6 האסימטריות → פרמטרים של המסלול הקנוני (01-ingest §4).",
"dependencies": [],
"details": "INV-ING1",
"status": "pending",
"status": "done",
"testStrategy": "",
"parentId": "59"
}
@@ -2140,6 +2140,18 @@
"status": "pending",
"testStrategy": "",
"parentId": "61"
},
{
"id": 2,
"title": "[backfill] multimodal page-images ל-42 החלטות-ועדה קיימות",
"description": "42/56 רשומות source_kind='internal_committee' נקלטו במסלול הישן בלי multimodal page-images (FU-1 מתקן רק קדימה). אחרי שמנגנון ה-re-index של FU-3 קיים — להריץ re-embed של עמודי-תמונה עליהן. ⚠️ קודם לכמת כמה מה-42 הן PDF-backed (לרשומות שנקלטו מ-text בלבד אין קובץ → אי-אפשר להטמיע עמודים). רק PDF-backed רלוונטיות.",
"dependencies": [
1
],
"details": "מקור: בדיקת DB 2026-05-30 (precedent_image_embeddings JOIN case_law). internal_committee: 14/56 עם page-images, 42 בלי. נגזר מ-GAP-02/FU-1 boundary discussion. לא פער-תקינות — שיפור multimodal coverage.",
"status": "pending",
"testStrategy": "",
"parentId": "61"
}
],
"updatedAt": "2026-05-30T17:37:34.741136+00:00"

View File

@@ -0,0 +1,833 @@
# FU-1 Unified Ingest Path — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05.
**Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`.
**Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`.
**Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md)
**Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
---
## File Structure
- **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`).
- **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests.
- **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py``ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get).
- **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py``ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal.
**Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved.
---
## Task 1: Failing tests for the unified pipeline
**Files:**
- Test: `mcp-server/tests/test_unified_ingest.py`
- [ ] **Step 1: Write the failing tests**
```python
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
Proves both intake types flow through services.ingest.ingest_document and that
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
queued for BOTH types (GAP-02 regression), enum validation applies to both
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
external citation guard is preserved.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from uuid import uuid4
import pytest
from legal_mcp import config
from legal_mcp.services import db, embeddings, chunker, extractor
from legal_mcp.services import ingest, precedent_library, internal_decisions
def _run(coro):
return asyncio.run(coro)
class _Chunk:
def __init__(self, i):
self.chunk_index = i
self.content = f"chunk-{i}"
self.section_type = "body"
self.page_number = 1
self.role = "child"
self.local_id = f"c{i}"
self.parent_local_id = None
@pytest.fixture()
def patched(monkeypatch, tmp_path):
"""Patch every I/O boundary. Record queue + create calls."""
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
async def _extract_text(path):
return ("full decision text", 2, [0, 100])
def _strip(text):
return text
def _chunk(text, page_offsets=None):
return [_Chunk(0), _Chunk(1)]
async def _embed(texts, input_type="document"):
return [[0.0] * 8 for _ in texts]
async def _store_chunks(cid, dicts):
calls["chunks"].append((cid, len(dicts)))
return len(dicts)
async def _create_external(**kw):
calls["create"].append(("external", kw))
return {"id": uuid4()}
async def _create_internal(**kw):
calls["create"].append(("internal", kw))
return {"id": uuid4()}
async def _req_meta(cid):
calls["metadata"].append(cid)
async def _req_hal(cid):
calls["halacha"].append(cid)
async def _set_status(cid, status):
return None
monkeypatch.setattr(extractor, "extract_text", _extract_text)
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
monkeypatch.setattr(chunker, "chunk_document", _chunk)
monkeypatch.setattr(embeddings, "embed_texts", _embed)
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
monkeypatch.setattr(db, "create_external_case_law", _create_external)
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
# Force flat chunking + multimodal OFF unless a test flips it.
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
return calls
def _make_pdf(tmp_path) -> str:
p = tmp_path / "decision.pdf"
p.write_bytes(b"%PDF-1.4 fake")
return str(p)
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
"""GAP-02 regression: the internal path must queue metadata too."""
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
district="ירושלים", practice_area="betterment_levy",
))
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
assert len(patched["halacha"]) == 1
def test_external_queues_both(patched, tmp_path):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert len(patched["metadata"]) == 1
assert len(patched["halacha"]) == 1
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
seen = []
real = ingest.ingest_document
async def _spy(spec, **kw):
seen.append(spec.source_kind)
return await real(spec, **kw)
monkeypatch.setattr(ingest, "ingest_document", _spy)
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
assert seen == ["internal_committee", "external_upload"]
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
"""GAP-04: internal path must validate enums like the external one."""
with pytest.raises(ValueError, match="practice_area"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
with pytest.raises(ValueError, match="practice_area"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
with pytest.raises(ValueError, match="ערר"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
def test_internal_text_path_works_without_file(patched):
out = _run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
assert out["status"] == "completed"
assert out["case_law_id"]
def test_internal_requires_file_or_text(patched):
with pytest.raises(ValueError, match="file_path or text"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
kind, kw = patched["create"][0]
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError).
- [ ] **Step 3: Commit the red tests**
```bash
cd ~/legal-ai
git add mcp-server/tests/test_unified_ingest.py
git commit -m "test(ingest): failing tests for unified pipeline (FU-1)"
```
---
## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers
**Files:**
- Create: `mcp-server/src/legal_mcp/services/ingest.py`
- [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers**
```python
"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import re
import shutil
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
dest_dir = root / (subdir or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
shutil.copy2(src_path, dest)
return dest
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
```
- [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)**
```python
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
```
- [ ] **Step 3: Verify the module imports cleanly**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"`
Expected: prints `IntakeSpec`, no error.
- [ ] **Step 4: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)"
```
---
## Task 3: Canonical `ingest_document`
**Files:**
- Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`)
- [ ] **Step 1: Append the canonical pipeline function**
```python
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
else:
raw_text = (text or "").strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
```
- [ ] **Step 2: Verify import**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"`
Expected: prints `ingest_document`.
- [ ] **Step 3: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/ingest.py
git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)"
```
> **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the
> leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match
> each DB-create's remaining parameters. Verify against the signatures:
> `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)`
> and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`.
---
## Task 4: External spec + rewrite `ingest_precedent` as wrapper
**Files:**
- Modify: `mcp-server/src/legal_mcp/services/precedent_library.py`
- [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper**
Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`,
`_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest.
Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add:
```python
from legal_mcp.services import ingest
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
def _external_validate(inputs: dict) -> None:
citation = (inputs.get("citation") or "").strip()
if not citation:
raise ValueError("citation is required")
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
def _external_staging_subdir(inputs: dict) -> str:
st = inputs.get("source_type") or ""
return st if st in {"court_ruling", "appeals_committee"} else "other"
_EXTERNAL_SPEC = ingest.IntakeSpec(
source_kind="external_upload",
id_field="citation",
staging_root=PRECEDENT_LIBRARY_DIR,
staging_subdir=_external_staging_subdir,
validate=_external_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
derive=lambda inputs: {},
display_name_fallback="citation",
create_record=_create_external_record,
)
async def _create_external_record(**kw) -> dict:
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
return await db.create_external_case_law(
case_number=kw["citation"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
headnote=(kw.get("headnote") or "").strip(),
source_type=kw.get("source_type", ""),
precedent_level=kw.get("precedent_level", ""),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
)
async def ingest_precedent(
*,
file_path: str | Path,
citation: str,
case_name: str = "",
court: str = "",
decision_date=None,
source_type: str = "",
precedent_level: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
is_binding: bool = True,
headnote: str = "",
summary: str = "",
document_id: UUID | None = None,
progress: ingest.ProgressCb | None = None,
) -> dict:
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
inputs = {
"citation": citation, "case_name": case_name, "court": court,
"decision_date": decision_date, "source_type": source_type,
"precedent_level": precedent_level, "practice_area": practice_area,
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
"is_binding": is_binding, "headnote": headnote, "summary": summary,
}
return await ingest.ingest_document(
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
document_id=document_id, progress=progress,
)
```
> Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at
> dataclass-construction time). Reorder if needed.
- [ ] **Step 2: Run external-path tests**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v`
Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`,
`test_external_citation_guard_still_blocks_arar` PASS.
- [ ] **Step 3: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/precedent_library.py
git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)"
```
---
## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper
**Files:**
- Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py`
- [ ] **Step 1: Replace the ingest section with a spec + wrapper**
Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of
`ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`,
`_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add:
```python
from legal_mcp.services import ingest
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
def _internal_validate(inputs: dict) -> None:
if not (inputs.get("case_number") or "").strip():
raise ValueError("case_number is required")
def _internal_derive(inputs: dict) -> dict:
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
)
return {"district": district, "proceeding_type": proc}
async def _create_internal_record(**kw) -> dict:
return await db.create_internal_committee_decision(
case_number=kw["case_number"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
chair_name=(kw.get("chair_name") or "").strip(),
district=kw.get("district", ""),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
proceeding_type=kw.get("proceeding_type") or "ערר",
)
_INTERNAL_SPEC = ingest.IntakeSpec(
source_kind="internal_committee",
id_field="case_number",
staging_root=INTERNAL_DECISIONS_DIR,
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
validate=_internal_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
derive=_internal_derive,
display_name_fallback="case_number",
create_record=_create_internal_record,
)
async def ingest_internal_decision(
*,
case_number: str,
case_name: str = "",
court: str = "",
decision_date=None,
chair_name: str = "",
district: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
summary: str = "",
is_binding: bool = True,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
queue_halachot: bool = True, # retained for signature compat; pipeline always queues
proceeding_type: str = "",
) -> dict:
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
inputs = {
"case_number": case_number, "case_name": case_name, "court": court,
"decision_date": decision_date, "chair_name": chair_name, "district": district,
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
"proceeding_type": proceeding_type,
}
out = await ingest.ingest_document(
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
document_id=document_id,
)
return {"status": out["status"], "case_law_id": out["case_law_id"],
"chunks": out["chunks"], "halachot_pending": True}
```
> `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline
> always queues both (per INV-ING3). Confirm with the user during execution that bulk
> re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this
> wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop.
- [ ] **Step 2: Run the full test file**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02).
- [ ] **Step 3: Commit**
```bash
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/internal_decisions.py
git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)"
```
---
## Task 6: Dead-code sweep, smoke import, full suite
**Files:**
- Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py`
- [ ] **Step 1: Confirm no orphaned references to removed helpers**
Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py`
Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete.
- [ ] **Step 2: Smoke-import every affected module + its callers**
Run:
```bash
cd ~/legal-ai/mcp-server && .venv/bin/python -c "
from legal_mcp.services import ingest, precedent_library, internal_decisions
from legal_mcp.tools import precedent_library as t1, internal_decisions as t2
import inspect
sig_p = inspect.signature(precedent_library.ingest_precedent)
sig_i = inspect.signature(internal_decisions.ingest_internal_decision)
assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters
assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters
print('signatures preserved; imports clean')
"
```
Expected: prints `signatures preserved; imports clean`.
- [ ] **Step 3: Run the entire test suite (no regressions elsewhere)**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q`
Expected: all pre-existing tests still pass + the 9 new ones.
- [ ] **Step 4: Lint the changed files (match repo style)**
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"`
Expected: clean, or "skip".
- [ ] **Step 5: Update TaskMaster #59 → done**
Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task).
- [ ] **Step 6: Final commit**
```bash
cd ~/legal-ai
git add -A mcp-server/
git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)"
```
---
## Self-Review Notes
- **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5.
- **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration.
- **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior.
- **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.

View File

@@ -0,0 +1,150 @@
# FU-1 — איחוד מסלול-הקליטה (Unified Ingest Path) — עיצוב
**סטטוס:** מאושר-לעיצוב · **תאריך:** 2026-05-30 · **ענף:** TBD (ייפתח בביצוע)
**מכסה:** GAP-01, GAP-02, GAP-04, GAP-05 · **מספק:** INV-ING1, INV-ING3, INV-G2, INV-G4
**מקורות:** [docs/spec/01-ingest.md](../../spec/01-ingest.md), [docs/spec/gap-audit.md](../../spec/gap-audit.md) · **משימה:** TaskMaster #59 (legal-ai)
**סוג-עבודה:** pure-code · **מיגרציה:** אין (אומת מול DB 2026-05-30 — ראה §6)
---
## 1. הבעיה
שתי פונקציות-קליטה מקבילות לישויות-אחיות, שמשכפלות את צעדי 210 של הפייפליין ומתפצלות
בפרטים:
- `services/precedent_library.py::ingest_precedent` (פסיקה חיצונית, `source_kind='external_upload'`)
- `services/internal_decisions.py::ingest_internal_decision` (החלטות-ועדה, `source_kind='internal_committee'`)
מסלולים מקבילים גוררים drift — צעד שקיים באחד וחסר באחר. הביטוי הקונקרטי: **GAP-02**
המסלול הפנימי מתזמן רק `request_halacha_extraction` ולא `request_metadata_extraction`,
ולכן החלטות-ועדה נקלטו בלי metadata. שש אסימטריות נוספות (GAP-01/04/05) מתועדות ב-01-ingest §4.
## 2. ההכרעה האדריכלית (מאומתת)
**Template Method skeleton + Strategy via config object.** פונקציה קנונית אחת מריצה את
שלד-הפייפליין (סדר-צעדים אכיף); כל מה שמשתנה לפי סוג נישא ב-config object מוזרק (`IntakeSpec`).
שתי הפונקציות הציבוריות נשמרות כ-API בעל-שם ומאצילות לליבה.
| החלטה | נימוק | מקורות (≥3) |
|-------|--------|-------------|
| מסלול קנוני יחיד (לא 2 מקבילים, לא ליבה-משותפת בין 2 כניסות) | Template Method: "אלגוריתמים כמעט-זהים עם הבדלים קטנים" → שלד אחד אוכף סדר-צעדים, צעד-חסר נעשה בלתי-אפשרי | refactoring.guru (Template Method); SourceMaking (Strategy); ADF parameterized-pipelines |
| שמירת `ingest_precedent`/`ingest_internal_decision` כ-API ציבורי | Fowler FlagArgument: "separate methods communicate more clearly"; ליבה משותפת מוסתרת כשהלוגיקה שזורה | martinfowler.com/bliki/FlagArgument; ardalis; luzkan smells |
| ווריאציה ב-config object (`IntakeSpec`), לא boolean-flags | flag-argument הוא code smell; config object נותן בהירות + הרחבה | Fowler; ardalis; dev.to flag-anti-pattern |
| `validate` כ-callable, `enum_fields` כ-data | callable להטרוגני (Strategy idiomatic ב-Python first-class), data להומוגני ("אל תכפה הכל ל-strategy") | Strategy/Wikipedia; Functional-Strategy dev.to; Strategy-in-Python Medium |
| `create_record` כ-callable מוזרק, לא `if source_kind` | Replace Conditional with Polymorphism + factory injection ("tell, don't ask") | refactoring.guru (Replace-Conditional); code-maze (Factory+DI); c-sharpcorner |
## 3. מבנה מודולים
**מודול חדש:** `mcp-server/src/legal_mcp/services/ingest.py`
```
services/ingest.py ← חדש (בית המסלול הקנוני)
├── IntakeSpec (frozen dataclass — מתאר-הסוג)
├── async ingest_document(spec, *, file_path|text, inputs, progress) ← ליבה: צעדים 110
├── _stage_file(src, root, subdir) (אחיד — מאוחד משני הקבצים)
├── _coerce_date / _safe_filename (אחיד — היום משוכפל)
└── _embed_pages(case_law_id, pdf, n) (עובר מ-precedent_library.py — צעד 7 אחיד)
```
**API ציבורי — חתימה ללא שינוי לקוראים:**
- `precedent_library.py::ingest_precedent(...)` → בונה `_EXTERNAL_SPEC`, קורא `ingest.ingest_document(...)`.
- `internal_decisions.py::ingest_internal_decision(...)` → בונה `_INTERNAL_SPEC`, קורא `ingest.ingest_document(...)`.
**לא זז (גבול FU-2):** `db.create_external_case_law` / `db.create_internal_committee_decision`
נשארות נפרדות; מנותבות דרך `IntakeSpec.create_record`. כל שאר הפונקציות בשני קבצי-השירות
(search_*, migrate_*, reextract_*, process_pending_extractions, enrich_*) **לא נוגעים בהן**.
**הקוראים שלא משתנים:** MCP tools (`tools/precedent_library.py`, `tools/internal_decisions.py`)
וה-HTTP API ב-`web/` ממשיכים לקרוא לאותן שתי פונקציות ציבוריות.
## 4. ה-IntakeSpec
```python
@dataclass(frozen=True)
class IntakeSpec:
source_kind: str # 'external_upload' | 'internal_committee'
id_field: str # 'citation' | 'case_number' (לוג/שגיאות)
staging_root: Path # PRECEDENT_LIBRARY_DIR | INTERNAL_DECISIONS_DIR
staging_subdir: Callable[[dict], str] # inputs → subdir (source_type | district | 'other')
validate: Callable[[dict], None] # מרים ValueError (citation-guard / chair_name-חובה)
enum_fields: dict[str, frozenset[str]] # נאכף לשני הסוגים (GAP-04)
derive: Callable[[dict], dict] # שדות-נגזרים (district, proceeding_type); identity לחיצוני
display_name_fallback: str # שם-השדה כשחסר case_name ('citation'|'case_number')
create_record: Callable[..., Awaitable[dict]] # create_external_case_law | create_internal_committee_decision
```
הליבה `ingest_document` **לא יודעת** איזה סוג רץ — רק מפעילה את ה-hooks בנקודות מוגדרות.
## 5. הפייפליין הקנוני (צעדים 110, לפי 01-ingest §2)
סדר-הביצוע בפועל (ה-DB-create מוקדם — נדרש `case_law_id` לפני אחסון chunks; תואם את הקוד הקיים):
| # | צעד | אחיד? | מקור-וריאציה |
|---|------|-------|---------------|
| 1 | ולידציית-קלט + enums | מנגנון אחיד | `spec.validate` + `spec.enum_fields` |
| 2 | גזירת-שדות | מנגנון אחיד | `spec.derive` (identity לחיצוני) |
| 3 | Stage file | מנגנון אחיד | `spec.staging_root` + `spec.staging_subdir` |
| 4 | Extract text (טקסט-ריק = כשל מדווח) | ✅ מלא | — (internal גם מקבל `text` ישיר, בלי קובץ) |
| 5 | Strip Nevo preamble | ✅ מלא | — |
| 6 | **DB create → `case_law_id`** (ספציפי-לסוג) | מנותב | `spec.create_record` (+ `display_name_fallback`) |
| 7 | Chunk (hierarchical/flat לפי `PARENT_DOC_RETRIEVAL_ENABLED`) | ✅ מלא | — (flag, לא סוג) |
| 8 | Embed children + Store chunks | ✅ מלא | — |
| 9 | **Multimodal page-image embed** (flag+PDF+page_count>0) | ✅ מלא | — (**GAP-05 fix**: היה רק בחיצוני) |
| 10 | **Queue metadata extraction** | ✅ מלא | — (**GAP-02 fix**: היה רק בחיצוני) |
| 11 | Queue halacha extraction | ✅ מלא | — |
| 12 | Set statuses (extraction=completed, halacha=pending) | ✅ מלא | — |
> הערה: 01-ingest §2 ממספר 110 בלי למנות מפורשות את ה-DB-create; כאן הוא צעד 6 כי הוא קודם
> ל-chunking בקוד בפועל. שגיאת-עיבוד אחרי create → `extraction_status=failed` (כמו היום).
**אילוץ `claude_session`:** הליבה רק **מתזמנת** (`request_*_extraction` — כתיבת-DB טהורה).
אין import של `halacha_extractor`/`precedent_metadata_extractor` במסלול-הקליטה — נשמר כפי שהיום.
## 6. שינויי-התנהגות וסיכון
| שינוי | השפעה | סיכון |
|--------|--------|--------|
| **GAP-02**: internal עכשיו מתזמן metadata | החלטות-ועדה חדשות יקבלו headnote/summary/tags | אין — תיקון; רשומות קיימות כבר מלאות (0/56 חסר) |
| **GAP-04**: ולידציית-enums על internal | קלט עם practice_area לא-חוקי יידחה בקליטה | נמוך — כל 56 הקיימות חוקיות; בודקים שקוראי-internal מעבירים ערכים חוקיים |
| **GAP-05 multimodal**: internal PDF עכשיו מטמיע עמודים | החלטות-ועדה חדשות PDF יקבלו page-images | אין (non-fatal); קיימות → backfill ב-**TaskMaster 61.2 (FU-3)** |
| **GAP-05 fallback/staging/derive/guard**: מאוחדים | התנהגות זהה, מסלול אחד | אין — citation-guard נשמר ב-`_EXTERNAL_SPEC.validate` |
**אין מיגרציה (אומת מול DB 2026-05-30):** internal_committee = 56 רשומות; metadata חסר = **0**;
enums לא-חוקיים = **0**; multimodal: 14/56 יש (42 חסר → FU-3 #61.2). הריפקטור משנה רק התנהגות
*קדימה*; אינו נוגע בנתונים שמורים.
**Drift מתועד (זניח, מכוון — מסקירת-קוד סופית):**
- **empty-chunks early-return:** כשה-chunker מחזיר ריק על טקסט לא-ריק (נדיר), המקור הציב
`halacha_status=completed` ויצא בלי לתזמן; הקנוני נופל הלאה ומתזמן את שני החילוצים עם
`halacha_status=pending`. עקבי עם INV-ING3 (תיזמון אחיד) — שיפור, לא רגרסיה.
- **thumbnails של multimodal** להחלטות-ועדה יושבים תחת `precedent-library/thumbnails/`
(ממופתח לפי `case_law_id`) — מכוון, מתועד ב-docstring של `spec_thumb_dir`.
- **`queue_halachot`** הוסר כליל (wrapper + `migrate_from_style_corpus`) — הדגל איבד משמעות
תחת INV-ING3; אומת שאין caller שמעביר אותו.
## 7. אסטרטגיית בדיקה
pytest offline עם monkeypatch לכל גבולות-ה-I/O (db, embeddings, chunker, extractor) — כתבנית
[tests/test_search_domain_scope.py](../../../mcp-server/tests/test_search_domain_scope.py)
ו-[tests/test_precedent_corpus_isolation.py](../../../mcp-server/tests/test_precedent_corpus_isolation.py).
קובץ חדש: `mcp-server/tests/test_unified_ingest.py`. רץ עם `.venv` המקומי.
מקרי-בדיקה (TDD — נכשלים לפני, עוברים אחרי):
1. **regression GAP-02**`ingest_internal_decision` מתזמן גם metadata **וגם** halacha (לוכד את הבאג המקורי).
2. שני הסוגים זורמים דרך `ingest.ingest_document` (לא דרך גוף-קוד נפרד).
3. ולידציית-enum דוחה `practice_area` לא-חוקי בשני הסוגים (GAP-04).
4. citation-guard עדיין חוסם ציטוט `ערר`/`בל"מ` במסלול החיצוני.
5. staging-subdir נפתר נכון (source_type לחיצוני, district לפנימי, 'other' ל-fallback).
6. מסלול-`text` (פנימי, בלי קובץ) ומסלול-`file_path` שניהם עובדים.
7. multimodal מותנה flag+PDF+page_count — **לא** בסוג-ה-intake; PDF פנימי → מטמיע, text → לא.
8. fallback לשם-תצוגה: חסר case_name → נופל למזהה הקנוני הנכון לכל סוג.
9. אידמפוטנטיות-חתימה: ערכי-החזרה של שתי הפונקציות הציבוריות נשמרים (תאימות-קוראים).
## 8. סדר-ביצוע
1. כתיבת `test_unified_ingest.py` (אדום).
2. `services/ingest.py``IntakeSpec` + `ingest_document` + הזזת `_embed_pages` + helpers אחידים.
3. `_EXTERNAL_SPEC` + צמצום `ingest_precedent` ל-wrapper.
4. `_INTERNAL_SPEC` + צמצום `ingest_internal_decision` ל-wrapper.
5. הרצת הבדיקות (ירוק) + lint.
6. בדיקת-עשן: import של שני קבצי-השירות + ה-MCP tools (ללא שבירת חתימות).

View File

@@ -0,0 +1,257 @@
"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import re
import shutil
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
dest_dir = root / (subdir or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
shutil.copy2(src_path, dest)
return dest
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
else:
raw_text = (text or "").strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)

View File

@@ -16,21 +16,19 @@ Judicial decisions (Supreme Court, Administrative Court) stay in external_upload
from __future__ import annotations
import logging
import re
import shutil
from datetime import date
from pathlib import Path
from uuid import UUID, uuid4
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
from legal_mcp.services import db, embeddings, ingest
from legal_mcp.services.practice_area import derive_proceeding_type
logger = logging.getLogger(__name__)
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
_COURT_TO_DISTRICT = [
("ירושלים", "ירושלים"),
@@ -45,24 +43,6 @@ _COURT_TO_DISTRICT = [
]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}"
def _district_from_court(court: str) -> str:
for keyword, district in _COURT_TO_DISTRICT:
if keyword in court:
@@ -70,6 +50,51 @@ def _district_from_court(court: str) -> str:
return ""
def _internal_validate(inputs: dict) -> None:
if not (inputs.get("case_number") or "").strip():
raise ValueError("case_number is required")
def _internal_derive(inputs: dict) -> dict:
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
)
return {"district": district, "proceeding_type": proc}
async def _create_internal_record(**kw) -> dict:
return await db.create_internal_committee_decision(
case_number=kw["case_number"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
chair_name=(kw.get("chair_name") or "").strip(),
district=kw.get("district", ""),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
proceeding_type=kw.get("proceeding_type") or "ערר",
)
_INTERNAL_SPEC = ingest.IntakeSpec(
source_kind="internal_committee",
id_field="case_number",
staging_root=INTERNAL_DECISIONS_DIR,
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
validate=_internal_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
derive=_internal_derive,
display_name_fallback="case_number",
create_record=_create_internal_record,
)
async def ingest_internal_decision(
*,
case_number: str,
@@ -86,141 +111,25 @@ async def ingest_internal_decision(
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
queue_halachot: bool = True,
proceeding_type: str = "",
) -> dict:
"""Ingest an appeals-committee decision into the internal corpus.
Either file_path or text must be provided.
If district is empty, it is inferred from court.
If proceeding_type is empty, it is derived from appeal_subtype/case_name.
Returns: {"status": "completed", "case_law_id": "...", "chunks": N}
"""
if not file_path and not text:
raise ValueError("either file_path or text is required")
if not case_number.strip():
raise ValueError("case_number is required")
resolved_district = district.strip() or _district_from_court(court)
resolved_proc = proceeding_type.strip() or derive_proceeding_type(
appeal_subtype=appeal_subtype, subject=case_name,
)
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}"
shutil.copy2(src, staged)
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
raw_text = extractor.strip_nevo_preamble(raw_text or "").strip()
if not raw_text:
raise ValueError("no extractable text in file")
else:
raw_text = (text or "").strip()
if not raw_text:
raise ValueError("text is empty")
page_count = 0
page_offsets = None
record = await db.create_internal_committee_decision(
case_number=case_number.strip(),
case_name=(case_name.strip() or case_number.strip()),
full_text=raw_text,
court=court.strip(),
decision_date=_coerce_date(decision_date),
chair_name=chair_name.strip(),
district=resolved_district,
practice_area=practice_area,
appeal_subtype=appeal_subtype.strip(),
subject_tags=list(subject_tags or []),
summary=summary.strip(),
is_binding=is_binding,
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
inputs = {
"case_number": case_number, "case_name": case_name, "court": court,
"decision_date": decision_date, "chair_name": chair_name, "district": district,
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
"proceeding_type": proceeding_type,
}
out = await ingest.ingest_document(
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
document_id=document_id,
proceeding_type=resolved_proc,
)
case_law_id = UUID(str(record["id"]))
try:
# Parent-doc retrieval (TaskMaster #48) — same gated branch as
# ingest_precedent. Internal committee decisions are typically
# longer than external court rulings (full transcript + ruling),
# so the parent-doc benefit is even larger here.
if config.PARENT_DOC_RETRIEVAL_ENABLED:
h_chunks = chunker.chunk_document_hierarchical(
raw_text, page_offsets=page_offsets,
)
if not h_chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
child_vectors = await embeddings.embed_texts(
[c.content for c in children], input_type="document",
)
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number,
"embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id,
"parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number,
"embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(
case_law_id, chunk_dicts,
)
stored = counts["children"]
else:
chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets)
if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
chunk_texts = [c.content for c in chunks]
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
chunk_dicts = [
{
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
}
for c, v in zip(chunks, chunk_vectors)
]
stored = await db.store_precedent_chunks(case_law_id, chunk_dicts)
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
if queue_halachot:
await db.request_halacha_extraction(case_law_id)
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored,
"halachot_pending": True,
}
except Exception:
logger.exception("ingest_internal_decision failed for %s", case_number)
await db.set_case_law_extraction_status(case_law_id, "failed")
raise
return {"status": out["status"], "case_law_id": out["case_law_id"],
"chunks": out["chunks"], "halachot_pending": True}
async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict:
async def migrate_from_style_corpus(dry_run: bool = False) -> dict:
"""Re-index all style_corpus entries as searchable internal committee decisions.
Does NOT delete style_corpus rows — they remain for style analysis.
@@ -278,7 +187,6 @@ async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool
appeal_subtype=subtype,
subject_tags=subject_tags,
text=row["full_text"],
queue_halachot=queue_halachot,
)
results["ingested"] += 1
logger.info("Migrated style_corpus entry: %s", case_number)

View File

@@ -15,15 +15,12 @@ from __future__ import annotations
import asyncio
import logging
import re
import shutil
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401
from legal_mcp.services import db, embeddings, hybrid_search, ingest # noqa: F401
# Note: halacha_extractor and precedent_metadata_extractor are NOT imported
# at module load. They are imported lazily inside the dedicated re-extract
@@ -40,8 +37,8 @@ ProgressCb = Callable[[str, int, str], Awaitable[None]]
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
_VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"}
_VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"}
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
_VALID_PRECEDENT_LEVELS = {
"", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית",
"supreme", "administrative", "national_appeals_committee", "district_appeals_committee",
@@ -52,37 +49,54 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
def _safe_filename(name: str) -> str:
"""Strip path separators and unsafe chars from a user-provided name."""
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _external_validate(inputs: dict) -> None:
citation = (inputs.get("citation") or "").strip()
if not citation:
raise ValueError("citation is required")
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
def _stage_file(src_path: Path, source_type: str) -> Path:
"""Copy the uploaded file into data/precedent-library/<source_type>/.
Returns the destination path. Source file is not deleted (caller decides).
"""
sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other"
dest_dir = PRECEDENT_LIBRARY_DIR / sub
dest_dir.mkdir(parents=True, exist_ok=True)
safe_name = _safe_filename(src_path.name)
dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}"
shutil.copy2(src_path, dest)
return dest
def _external_staging_subdir(inputs: dict) -> str:
st = inputs.get("source_type") or ""
return st if st in {"court_ruling", "appeals_committee"} else "other"
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
async def _create_external_record(**kw) -> dict:
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
return await db.create_external_case_law(
case_number=kw["citation"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
headnote=(kw.get("headnote") or "").strip(),
source_type=kw.get("source_type", ""),
precedent_level=kw.get("precedent_level", ""),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
)
_EXTERNAL_SPEC = ingest.IntakeSpec(
source_kind="external_upload",
id_field="citation",
staging_root=PRECEDENT_LIBRARY_DIR,
staging_subdir=_external_staging_subdir,
validate=_external_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
derive=lambda inputs: {},
display_name_fallback="citation",
create_record=_create_external_record,
)
async def ingest_precedent(
@@ -101,220 +115,20 @@ async def ingest_precedent(
headnote: str = "",
summary: str = "",
document_id: UUID | None = None,
progress: ProgressCb | None = None,
progress: ingest.ProgressCb | None = None,
) -> dict:
"""Ingest a single uploaded precedent through the full pipeline.
Required: file_path + citation. Everything else has a sensible default.
Returns:
``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}``
"""
progress = progress or _noop_progress
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
if not citation.strip():
raise ValueError("citation is required")
# Citation guard at service level (catches both MCP and HTTP API paths).
# Appeals-committee decisions must go through ingest_internal_decision
# which records chair_name+district. The MCP wrapper has the same guard
# for an earlier, friendlier error message — but this is the source of
# truth. See TaskMaster #30(ב) and DB constraint case_law_external_arar_check.
_norm = citation.strip()
if _norm.startswith(("ערר ", "ערר(", "בל\"מ ", "בל\"מ(", "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
if practice_area not in _VALID_PRACTICE_AREAS:
raise ValueError(f"invalid practice_area: {practice_area!r}")
if source_type not in _VALID_SOURCE_TYPES:
raise ValueError(f"invalid source_type: {source_type!r}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, source_type)
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
text = (text or "").strip()
if not text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Strip any Nevo preamble that might wrap court rulings downloaded from Nevo.
text = extractor.strip_nevo_preamble(text)
await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים")
record = await db.create_external_case_law(
case_number=citation.strip(),
case_name=case_name.strip() or citation.strip(),
full_text=text,
court=court.strip(),
decision_date=_coerce_date(decision_date),
practice_area=practice_area,
appeal_subtype=appeal_subtype.strip(),
subject_tags=list(subject_tags or []),
summary=summary.strip(),
headnote=headnote.strip(),
source_type=source_type,
precedent_level=precedent_level,
is_binding=is_binding,
document_id=document_id,
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
inputs = {
"citation": citation, "case_name": case_name, "court": court,
"decision_date": decision_date, "source_type": source_type,
"precedent_level": precedent_level, "practice_area": practice_area,
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
"is_binding": is_binding, "headnote": headnote, "summary": summary,
}
return await ingest.ingest_document(
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
document_id=document_id, progress=progress,
)
case_law_id = UUID(str(record["id"]))
try:
# Parent-doc retrieval (TaskMaster #48): when enabled, emit
# two tiers (parents + children). Only children are embedded
# and indexed; parents carry retrieval context. When disabled,
# fall back to legacy single-tier chunking — identical
# behaviour to pre-V17.
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress(
"chunking", 40,
f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')",
)
h_chunks = chunker.chunk_document_hierarchical(
text, page_offsets=page_offsets,
)
if not h_chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
await progress("completed", 100, "אין טקסט לעיבוד")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": 0,
"halachot": 0,
}
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress(
"embedding", 55,
f"מייצר embeddings ל-{len(children)} children "
f"({len(parents)} parents)",
)
child_texts = [c.content for c in children]
child_vectors = await embeddings.embed_texts(
child_texts, input_type="document",
)
# Build flat dict list for the two-pass writer.
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent",
"local_id": p.local_id,
"parent_local_id": None,
"chunk_index": p.chunk_index,
"content": p.content,
"section_type": p.section_type,
"page_number": p.page_number,
"embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child",
"local_id": c.local_id,
"parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(
case_law_id, chunk_dicts,
)
stored_chunks = counts["children"]
else:
await progress(
"chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')",
)
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
await progress("completed", 100, "אין טקסט לעיבוד")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": 0,
"halachot": 0,
}
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_texts = [c.content for c in chunks]
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
chunk_dicts = [
{
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
}
for c, v in zip(chunks, chunk_vectors)
]
stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts)
# Multimodal page-image embeddings (V9). Gated by feature flag.
# Non-fatal: text path already succeeded. Only PDFs.
if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf":
try:
await progress(
"embedding_images", 70,
f"מטמיע {page_count} עמודי תמונה (multimodal)",
)
await _embed_precedent_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e)
# Pipeline split: the container does the non-LLM half (extract +
# chunk + embed + store). LLM-driven extraction (metadata, halachot)
# runs separately via the MCP tool `precedent_process_pending` from
# local Claude Code, where `claude` CLI is available.
#
# We auto-queue both extractions so the chair doesn't need to click
# any button — the moment they (or me) run `precedent_process_pending`
# in chat, both kinds get processed.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress(
"completed",
100,
f"הוכנס לספרייה: {stored_chunks} chunks. "
f"חילוץ הלכות ומטא-דאטה ממתינים בתור — "
f"להפעיל מ-Claude Code: precedent_process_pending.",
)
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("precedent_library.ingest_precedent failed: %s", e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def reextract_halachot(
@@ -586,48 +400,3 @@ async def search_library(
subject_tag=subject_tag,
include_halachot=include_halachot,
)
async def _embed_precedent_pages(
case_law_id: UUID,
pdf_path: Path,
page_count: int,
) -> dict:
"""Render precedent PDF pages → embed via voyage-multimodal → store.
Thumbnails go to
``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``.
"""
thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path,
config.MULTIMODAL_DPI,
config.MULTIMODAL_THUMB_DPI,
thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1,
"embedding": emb,
"image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info(
"Multimodal: stored %d page-image embeddings for case_law %s",
stored, case_law_id,
)
return {"pages_embedded": stored}

View File

@@ -0,0 +1,169 @@
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
Proves both intake types flow through services.ingest.ingest_document and that
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
queued for BOTH types (GAP-02 regression), enum validation applies to both
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
external citation guard is preserved.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from uuid import uuid4
import pytest
from legal_mcp import config
from legal_mcp.services import db, embeddings, chunker, extractor
from legal_mcp.services import ingest, precedent_library, internal_decisions
def _run(coro):
return asyncio.run(coro)
class _Chunk:
def __init__(self, i):
self.chunk_index = i
self.content = f"chunk-{i}"
self.section_type = "body"
self.page_number = 1
self.role = "child"
self.local_id = f"c{i}"
self.parent_local_id = None
@pytest.fixture()
def patched(monkeypatch, tmp_path):
"""Patch every I/O boundary. Record queue + create calls."""
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
async def _extract_text(path):
return ("full decision text", 2, [0, 100])
def _strip(text):
return text
def _chunk(text, page_offsets=None):
return [_Chunk(0), _Chunk(1)]
async def _embed(texts, input_type="document"):
return [[0.0] * 8 for _ in texts]
async def _store_chunks(cid, dicts):
calls["chunks"].append((cid, len(dicts)))
return len(dicts)
async def _create_external(**kw):
calls["create"].append(("external", kw))
return {"id": uuid4()}
async def _create_internal(**kw):
calls["create"].append(("internal", kw))
return {"id": uuid4()}
async def _req_meta(cid):
calls["metadata"].append(cid)
async def _req_hal(cid):
calls["halacha"].append(cid)
async def _set_status(cid, status):
return None
monkeypatch.setattr(extractor, "extract_text", _extract_text)
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
monkeypatch.setattr(chunker, "chunk_document", _chunk)
monkeypatch.setattr(embeddings, "embed_texts", _embed)
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
monkeypatch.setattr(db, "create_external_case_law", _create_external)
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
# Force flat chunking + multimodal OFF unless a test flips it.
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
return calls
def _make_pdf(tmp_path) -> str:
p = tmp_path / "decision.pdf"
p.write_bytes(b"%PDF-1.4 fake")
return str(p)
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
"""GAP-02 regression: the internal path must queue metadata too."""
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
district="ירושלים", practice_area="betterment_levy",
))
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
assert len(patched["halacha"]) == 1
def test_external_queues_both(patched, tmp_path):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
practice_area="rishuy_uvniya", source_type="court_ruling",
))
assert len(patched["metadata"]) == 1
assert len(patched["halacha"]) == 1
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
seen = []
real = ingest.ingest_document
async def _spy(spec, **kw):
seen.append(spec.source_kind)
return await real(spec, **kw)
monkeypatch.setattr(ingest, "ingest_document", _spy)
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
assert seen == ["internal_committee", "external_upload"]
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
"""GAP-04: internal path must validate enums like the external one."""
with pytest.raises(ValueError, match="practice_area"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
with pytest.raises(ValueError, match="practice_area"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
with pytest.raises(ValueError, match="ערר"):
_run(precedent_library.ingest_precedent(
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
def test_internal_text_path_works_without_file(patched):
out = _run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
assert out["status"] == "completed"
assert out["case_law_id"]
def test_internal_requires_file_or_text(patched):
with pytest.raises(ValueError, match="file_path or text"):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
_run(internal_decisions.ingest_internal_decision(
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
kind, kw = patched["create"][0]
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"