FU-1: איחוד מסלול-הקליטה למסלול קנוני אחד (GAP-01/02/04/05) #11
@@ -2007,7 +2007,7 @@
|
||||
"description": "מאחד את ingest_precedent ו-ingest_internal_decision למסלול קנוני יחיד; מבטל את האסימטריות.",
|
||||
"details": "מכסה GAP-01,02,04,05. מספק INV-ING1/ING3/G2/G4. severity: Critical. סוג: קוד. יסוד — FU-2/FU-3/FU-7 תלויים בו. מקור: docs/spec/gap-audit.md + 01-ingest.md.",
|
||||
"testStrategy": "",
|
||||
"status": "pending",
|
||||
"status": "done",
|
||||
"dependencies": [],
|
||||
"priority": "high",
|
||||
"subtasks": [
|
||||
@@ -2017,7 +2017,7 @@
|
||||
"description": "ביטול שני המסלולים המקבילים (precedent_library.py:88 vs internal_decisions.py:73); כל סוג = פרמטרים, לא פונקציה נפרדת.",
|
||||
"dependencies": [],
|
||||
"details": "INV-ING1/G2",
|
||||
"status": "pending",
|
||||
"status": "done",
|
||||
"testStrategy": "",
|
||||
"parentId": "59"
|
||||
},
|
||||
@@ -2027,7 +2027,7 @@
|
||||
"description": "קריאה ל-request_metadata_extraction גם במסלול הפנימי (היום רק halacha, internal_decisions.py:208).",
|
||||
"dependencies": [],
|
||||
"details": "INV-ING3/DM1",
|
||||
"status": "pending",
|
||||
"status": "done",
|
||||
"testStrategy": "",
|
||||
"parentId": "59"
|
||||
},
|
||||
@@ -2037,7 +2037,7 @@
|
||||
"description": "הוספת ולידציית practice_area/source_type למסלול הפנימי (כמו precedent_library.py:131-134).",
|
||||
"dependencies": [],
|
||||
"details": "INV-G4",
|
||||
"status": "pending",
|
||||
"status": "done",
|
||||
"testStrategy": "",
|
||||
"parentId": "59"
|
||||
},
|
||||
@@ -2047,7 +2047,7 @@
|
||||
"description": "שאר 6 האסימטריות → פרמטרים של המסלול הקנוני (01-ingest §4).",
|
||||
"dependencies": [],
|
||||
"details": "INV-ING1",
|
||||
"status": "pending",
|
||||
"status": "done",
|
||||
"testStrategy": "",
|
||||
"parentId": "59"
|
||||
}
|
||||
@@ -2140,6 +2140,18 @@
|
||||
"status": "pending",
|
||||
"testStrategy": "",
|
||||
"parentId": "61"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "[backfill] multimodal page-images ל-42 החלטות-ועדה קיימות",
|
||||
"description": "42/56 רשומות source_kind='internal_committee' נקלטו במסלול הישן בלי multimodal page-images (FU-1 מתקן רק קדימה). אחרי שמנגנון ה-re-index של FU-3 קיים — להריץ re-embed של עמודי-תמונה עליהן. ⚠️ קודם לכמת כמה מה-42 הן PDF-backed (לרשומות שנקלטו מ-text בלבד אין קובץ → אי-אפשר להטמיע עמודים). רק PDF-backed רלוונטיות.",
|
||||
"dependencies": [
|
||||
1
|
||||
],
|
||||
"details": "מקור: בדיקת DB 2026-05-30 (precedent_image_embeddings JOIN case_law). internal_committee: 14/56 עם page-images, 42 בלי. נגזר מ-GAP-02/FU-1 boundary discussion. לא פער-תקינות — שיפור multimodal coverage.",
|
||||
"status": "pending",
|
||||
"testStrategy": "",
|
||||
"parentId": "61"
|
||||
}
|
||||
],
|
||||
"updatedAt": "2026-05-30T17:37:34.741136+00:00"
|
||||
|
||||
833
docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md
Normal file
833
docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md
Normal file
@@ -0,0 +1,833 @@
|
||||
# FU-1 Unified Ingest Path — Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05.
|
||||
|
||||
**Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`.
|
||||
|
||||
**Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`.
|
||||
|
||||
**Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md)
|
||||
|
||||
**Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
- **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`).
|
||||
- **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests.
|
||||
- **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py` — `ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get).
|
||||
- **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py` — `ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal.
|
||||
|
||||
**Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved.
|
||||
|
||||
---
|
||||
|
||||
## Task 1: Failing tests for the unified pipeline
|
||||
|
||||
**Files:**
|
||||
- Test: `mcp-server/tests/test_unified_ingest.py`
|
||||
|
||||
- [ ] **Step 1: Write the failing tests**
|
||||
|
||||
```python
|
||||
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
|
||||
|
||||
Proves both intake types flow through services.ingest.ingest_document and that
|
||||
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
|
||||
queued for BOTH types (GAP-02 regression), enum validation applies to both
|
||||
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
|
||||
external citation guard is preserved.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import db, embeddings, chunker, extractor
|
||||
from legal_mcp.services import ingest, precedent_library, internal_decisions
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
class _Chunk:
|
||||
def __init__(self, i):
|
||||
self.chunk_index = i
|
||||
self.content = f"chunk-{i}"
|
||||
self.section_type = "body"
|
||||
self.page_number = 1
|
||||
self.role = "child"
|
||||
self.local_id = f"c{i}"
|
||||
self.parent_local_id = None
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def patched(monkeypatch, tmp_path):
|
||||
"""Patch every I/O boundary. Record queue + create calls."""
|
||||
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
|
||||
|
||||
async def _extract_text(path):
|
||||
return ("full decision text", 2, [0, 100])
|
||||
|
||||
def _strip(text):
|
||||
return text
|
||||
|
||||
def _chunk(text, page_offsets=None):
|
||||
return [_Chunk(0), _Chunk(1)]
|
||||
|
||||
async def _embed(texts, input_type="document"):
|
||||
return [[0.0] * 8 for _ in texts]
|
||||
|
||||
async def _store_chunks(cid, dicts):
|
||||
calls["chunks"].append((cid, len(dicts)))
|
||||
return len(dicts)
|
||||
|
||||
async def _create_external(**kw):
|
||||
calls["create"].append(("external", kw))
|
||||
return {"id": uuid4()}
|
||||
|
||||
async def _create_internal(**kw):
|
||||
calls["create"].append(("internal", kw))
|
||||
return {"id": uuid4()}
|
||||
|
||||
async def _req_meta(cid):
|
||||
calls["metadata"].append(cid)
|
||||
|
||||
async def _req_hal(cid):
|
||||
calls["halacha"].append(cid)
|
||||
|
||||
async def _set_status(cid, status):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(extractor, "extract_text", _extract_text)
|
||||
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
|
||||
monkeypatch.setattr(chunker, "chunk_document", _chunk)
|
||||
monkeypatch.setattr(embeddings, "embed_texts", _embed)
|
||||
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
|
||||
monkeypatch.setattr(db, "create_external_case_law", _create_external)
|
||||
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
|
||||
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
|
||||
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
|
||||
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
|
||||
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
|
||||
# Force flat chunking + multimodal OFF unless a test flips it.
|
||||
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
|
||||
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
|
||||
return calls
|
||||
|
||||
|
||||
def _make_pdf(tmp_path) -> str:
|
||||
p = tmp_path / "decision.pdf"
|
||||
p.write_bytes(b"%PDF-1.4 fake")
|
||||
return str(p)
|
||||
|
||||
|
||||
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
|
||||
"""GAP-02 regression: the internal path must queue metadata too."""
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
|
||||
district="ירושלים", practice_area="betterment_levy",
|
||||
))
|
||||
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
|
||||
assert len(patched["halacha"]) == 1
|
||||
|
||||
|
||||
def test_external_queues_both(patched, tmp_path):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
|
||||
practice_area="rishuy_uvniya", source_type="court_ruling",
|
||||
))
|
||||
assert len(patched["metadata"]) == 1
|
||||
assert len(patched["halacha"]) == 1
|
||||
|
||||
|
||||
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
|
||||
seen = []
|
||||
real = ingest.ingest_document
|
||||
|
||||
async def _spy(spec, **kw):
|
||||
seen.append(spec.source_kind)
|
||||
return await real(spec, **kw)
|
||||
|
||||
monkeypatch.setattr(ingest, "ingest_document", _spy)
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
|
||||
assert seen == ["internal_committee", "external_upload"]
|
||||
|
||||
|
||||
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
|
||||
"""GAP-04: internal path must validate enums like the external one."""
|
||||
with pytest.raises(ValueError, match="practice_area"):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
|
||||
|
||||
|
||||
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
|
||||
with pytest.raises(ValueError, match="practice_area"):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
|
||||
|
||||
|
||||
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
|
||||
with pytest.raises(ValueError, match="ערר"):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
|
||||
|
||||
|
||||
def test_internal_text_path_works_without_file(patched):
|
||||
out = _run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
||||
assert out["status"] == "completed"
|
||||
assert out["case_law_id"]
|
||||
|
||||
|
||||
def test_internal_requires_file_or_text(patched):
|
||||
with pytest.raises(ValueError, match="file_path or text"):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
|
||||
|
||||
|
||||
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
||||
kind, kw = patched["create"][0]
|
||||
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
||||
Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError).
|
||||
|
||||
- [ ] **Step 3: Commit the red tests**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add mcp-server/tests/test_unified_ingest.py
|
||||
git commit -m "test(ingest): failing tests for unified pipeline (FU-1)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers
|
||||
|
||||
**Files:**
|
||||
- Create: `mcp-server/src/legal_mcp/services/ingest.py`
|
||||
|
||||
- [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers**
|
||||
|
||||
```python
|
||||
"""Canonical ingest pipeline (FU-1).
|
||||
|
||||
One pipeline for all sibling-entity intake types (external precedent,
|
||||
internal committee decision). Per-type variation rides on an ``IntakeSpec``
|
||||
config object — never a parallel function. See
|
||||
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
|
||||
|
||||
claude_session rule preserved: this module only QUEUES extraction
|
||||
(``request_*_extraction`` = pure DB writes). It never imports
|
||||
halacha_extractor / precedent_metadata_extractor, so it is safe to call
|
||||
from the FastAPI container where the ``claude`` CLI is unavailable.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Awaitable, Callable
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import chunker, db, embeddings, extractor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ProgressCb = Callable[[str, int, str], Awaitable[None]]
|
||||
|
||||
|
||||
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IntakeSpec:
|
||||
"""Describes everything that varies between intake types."""
|
||||
source_kind: str
|
||||
id_field: str
|
||||
staging_root: Path
|
||||
staging_subdir: Callable[[dict], str]
|
||||
validate: Callable[[dict], None]
|
||||
enum_fields: dict[str, frozenset[str]]
|
||||
derive: Callable[[dict], dict]
|
||||
display_name_fallback: str
|
||||
create_record: Callable[..., Awaitable[dict]]
|
||||
|
||||
|
||||
def _coerce_date(value) -> date | None:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return date.fromisoformat(value[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _safe_filename(name: str) -> str:
|
||||
base = Path(name).name
|
||||
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
||||
|
||||
|
||||
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
||||
dest_dir = root / (subdir or "other")
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
||||
shutil.copy2(src_path, dest)
|
||||
return dest
|
||||
|
||||
|
||||
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
|
||||
for field_name, allowed in spec.enum_fields.items():
|
||||
value = inputs.get(field_name, "") or ""
|
||||
if value not in allowed:
|
||||
raise ValueError(f"invalid {field_name}: {value!r}")
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)**
|
||||
|
||||
```python
|
||||
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
|
||||
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
|
||||
thumb_dir = spec_thumb_dir(case_law_id)
|
||||
rendered = await asyncio.to_thread(
|
||||
extractor.render_pages_for_multimodal,
|
||||
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
|
||||
)
|
||||
images = [pil for pil, _ in rendered]
|
||||
thumbs = [t for _, t in rendered]
|
||||
img_embs = await embeddings.embed_images(images)
|
||||
|
||||
page_records = []
|
||||
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
|
||||
rel_thumb = None
|
||||
if thumb is not None:
|
||||
try:
|
||||
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
||||
except ValueError:
|
||||
rel_thumb = str(thumb)
|
||||
page_records.append({
|
||||
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
|
||||
})
|
||||
stored = await db.store_precedent_image_embeddings(
|
||||
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
|
||||
)
|
||||
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
|
||||
return {"pages_embedded": stored}
|
||||
|
||||
|
||||
def spec_thumb_dir(case_law_id: UUID) -> Path:
|
||||
"""Thumbnails live under the precedent-library tree regardless of intake type."""
|
||||
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify the module imports cleanly**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"`
|
||||
Expected: prints `IntakeSpec`, no error.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add mcp-server/src/legal_mcp/services/ingest.py
|
||||
git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Canonical `ingest_document`
|
||||
|
||||
**Files:**
|
||||
- Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`)
|
||||
|
||||
- [ ] **Step 1: Append the canonical pipeline function**
|
||||
|
||||
```python
|
||||
async def ingest_document(
|
||||
spec: IntakeSpec,
|
||||
*,
|
||||
inputs: dict,
|
||||
file_path: str | Path | None = None,
|
||||
text: str | None = None,
|
||||
document_id: UUID | None = None,
|
||||
progress: ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Run the canonical 12-step pipeline for one intake item.
|
||||
|
||||
``inputs`` carries the type-specific record fields (citation/case_number,
|
||||
case_name, court, practice_area, etc.). ``spec`` decides how they are
|
||||
validated, staged, derived, and which DB-create runs. Returns a dict with
|
||||
at least: status, case_law_id, chunks.
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
|
||||
# Step 1: input validation (type-specific) + enums (uniform mechanism).
|
||||
if not file_path and text is None:
|
||||
raise ValueError("either file_path or text is required")
|
||||
spec.validate(inputs)
|
||||
_validate_enums(spec, inputs)
|
||||
|
||||
# Step 2: field derivation (identity for external).
|
||||
inputs = {**inputs, **spec.derive(inputs)}
|
||||
|
||||
# Steps 3-5: stage (if file) + extract + strip.
|
||||
page_count = 0
|
||||
page_offsets = None
|
||||
staged: Path | None = None
|
||||
if file_path:
|
||||
src = Path(file_path)
|
||||
if not src.is_file():
|
||||
raise FileNotFoundError(f"file not found: {src}")
|
||||
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
||||
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
||||
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
||||
try:
|
||||
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
||||
except Exception as e:
|
||||
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
|
||||
raise
|
||||
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
|
||||
else:
|
||||
raw_text = (text or "").strip()
|
||||
if not raw_text:
|
||||
await progress("failed", 100, "לא נמצא טקסט בקובץ")
|
||||
raise ValueError("no extractable text in file")
|
||||
|
||||
# Step 6: DB create (type-specific, routed — get case_law_id).
|
||||
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
|
||||
display_name = (inputs.get("case_name") or "").strip() or (
|
||||
inputs.get(spec.display_name_fallback) or ""
|
||||
).strip()
|
||||
record = await spec.create_record(
|
||||
full_text=raw_text,
|
||||
case_name=display_name,
|
||||
decision_date=_coerce_date(inputs.get("decision_date")),
|
||||
document_id=document_id,
|
||||
**{k: v for k, v in inputs.items()
|
||||
if k not in {"case_name", "decision_date", "file_path", "text"}},
|
||||
)
|
||||
case_law_id = UUID(str(record["id"]))
|
||||
|
||||
try:
|
||||
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
|
||||
|
||||
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
|
||||
if (config.MULTIMODAL_ENABLED and page_count > 0
|
||||
and staged is not None and staged.suffix.lower() == ".pdf"):
|
||||
try:
|
||||
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
|
||||
await _embed_pages(case_law_id, staged, page_count)
|
||||
except Exception as e:
|
||||
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
|
||||
|
||||
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "pending")
|
||||
await db.request_metadata_extraction(case_law_id)
|
||||
await db.request_halacha_extraction(case_law_id)
|
||||
|
||||
await progress("completed", 100,
|
||||
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": stored_chunks,
|
||||
"halachot": 0,
|
||||
"halachot_pending": True,
|
||||
"metadata_filled": [],
|
||||
"pages": page_count,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
|
||||
await db.set_case_law_extraction_status(case_law_id, "failed")
|
||||
await progress("failed", 100, f"כשל בעיבוד: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
|
||||
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
|
||||
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
||||
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
|
||||
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
|
||||
if not h_chunks:
|
||||
return 0
|
||||
children = [c for c in h_chunks if c.role == "child"]
|
||||
parents = [c for c in h_chunks if c.role == "parent"]
|
||||
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
|
||||
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
|
||||
chunk_dicts: list[dict] = []
|
||||
for p in parents:
|
||||
chunk_dicts.append({
|
||||
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
|
||||
"chunk_index": p.chunk_index, "content": p.content,
|
||||
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
|
||||
})
|
||||
for c, v in zip(children, child_vectors):
|
||||
chunk_dicts.append({
|
||||
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
|
||||
"chunk_index": c.chunk_index, "content": c.content,
|
||||
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
|
||||
})
|
||||
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
|
||||
return counts["children"]
|
||||
else:
|
||||
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
|
||||
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
||||
if not chunks:
|
||||
return 0
|
||||
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
|
||||
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
|
||||
chunk_dicts = [
|
||||
{"chunk_index": c.chunk_index, "content": c.content,
|
||||
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
|
||||
for c, v in zip(chunks, chunk_vectors)
|
||||
]
|
||||
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Verify import**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"`
|
||||
Expected: prints `ingest_document`.
|
||||
|
||||
- [ ] **Step 3: Commit**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add mcp-server/src/legal_mcp/services/ingest.py
|
||||
git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)"
|
||||
```
|
||||
|
||||
> **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the
|
||||
> leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match
|
||||
> each DB-create's remaining parameters. Verify against the signatures:
|
||||
> `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)`
|
||||
> and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`.
|
||||
|
||||
---
|
||||
|
||||
## Task 4: External spec + rewrite `ingest_precedent` as wrapper
|
||||
|
||||
**Files:**
|
||||
- Modify: `mcp-server/src/legal_mcp/services/precedent_library.py`
|
||||
|
||||
- [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper**
|
||||
|
||||
Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`,
|
||||
`_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest.
|
||||
Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add:
|
||||
|
||||
```python
|
||||
from legal_mcp.services import ingest
|
||||
|
||||
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
|
||||
|
||||
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
||||
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
|
||||
|
||||
|
||||
def _external_validate(inputs: dict) -> None:
|
||||
citation = (inputs.get("citation") or "").strip()
|
||||
if not citation:
|
||||
raise ValueError("citation is required")
|
||||
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
|
||||
raise ValueError(
|
||||
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
|
||||
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
|
||||
"לא ב-precedent_library_upload."
|
||||
)
|
||||
|
||||
|
||||
def _external_staging_subdir(inputs: dict) -> str:
|
||||
st = inputs.get("source_type") or ""
|
||||
return st if st in {"court_ruling", "appeals_committee"} else "other"
|
||||
|
||||
|
||||
_EXTERNAL_SPEC = ingest.IntakeSpec(
|
||||
source_kind="external_upload",
|
||||
id_field="citation",
|
||||
staging_root=PRECEDENT_LIBRARY_DIR,
|
||||
staging_subdir=_external_staging_subdir,
|
||||
validate=_external_validate,
|
||||
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
|
||||
derive=lambda inputs: {},
|
||||
display_name_fallback="citation",
|
||||
create_record=_create_external_record,
|
||||
)
|
||||
|
||||
|
||||
async def _create_external_record(**kw) -> dict:
|
||||
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
|
||||
return await db.create_external_case_law(
|
||||
case_number=kw["citation"].strip(),
|
||||
case_name=kw["case_name"],
|
||||
full_text=kw["full_text"],
|
||||
court=(kw.get("court") or "").strip(),
|
||||
decision_date=kw.get("decision_date"),
|
||||
practice_area=kw.get("practice_area", ""),
|
||||
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
||||
subject_tags=list(kw.get("subject_tags") or []),
|
||||
summary=(kw.get("summary") or "").strip(),
|
||||
headnote=(kw.get("headnote") or "").strip(),
|
||||
source_type=kw.get("source_type", ""),
|
||||
precedent_level=kw.get("precedent_level", ""),
|
||||
is_binding=kw.get("is_binding", True),
|
||||
document_id=kw.get("document_id"),
|
||||
)
|
||||
|
||||
|
||||
async def ingest_precedent(
|
||||
*,
|
||||
file_path: str | Path,
|
||||
citation: str,
|
||||
case_name: str = "",
|
||||
court: str = "",
|
||||
decision_date=None,
|
||||
source_type: str = "",
|
||||
precedent_level: str = "",
|
||||
practice_area: str = "",
|
||||
appeal_subtype: str = "",
|
||||
subject_tags: list[str] | None = None,
|
||||
is_binding: bool = True,
|
||||
headnote: str = "",
|
||||
summary: str = "",
|
||||
document_id: UUID | None = None,
|
||||
progress: ingest.ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
|
||||
inputs = {
|
||||
"citation": citation, "case_name": case_name, "court": court,
|
||||
"decision_date": decision_date, "source_type": source_type,
|
||||
"precedent_level": precedent_level, "practice_area": practice_area,
|
||||
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
|
||||
"is_binding": is_binding, "headnote": headnote, "summary": summary,
|
||||
}
|
||||
return await ingest.ingest_document(
|
||||
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
|
||||
document_id=document_id, progress=progress,
|
||||
)
|
||||
```
|
||||
|
||||
> Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at
|
||||
> dataclass-construction time). Reorder if needed.
|
||||
|
||||
- [ ] **Step 2: Run external-path tests**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v`
|
||||
Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`,
|
||||
`test_external_citation_guard_still_blocks_arar` PASS.
|
||||
|
||||
- [ ] **Step 3: Commit**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add mcp-server/src/legal_mcp/services/precedent_library.py
|
||||
git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper
|
||||
|
||||
**Files:**
|
||||
- Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py`
|
||||
|
||||
- [ ] **Step 1: Replace the ingest section with a spec + wrapper**
|
||||
|
||||
Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of
|
||||
`ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`,
|
||||
`_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add:
|
||||
|
||||
```python
|
||||
from legal_mcp.services import ingest
|
||||
|
||||
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
|
||||
|
||||
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
||||
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
|
||||
|
||||
|
||||
def _internal_validate(inputs: dict) -> None:
|
||||
if not (inputs.get("case_number") or "").strip():
|
||||
raise ValueError("case_number is required")
|
||||
|
||||
|
||||
def _internal_derive(inputs: dict) -> dict:
|
||||
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
|
||||
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
|
||||
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
|
||||
)
|
||||
return {"district": district, "proceeding_type": proc}
|
||||
|
||||
|
||||
async def _create_internal_record(**kw) -> dict:
|
||||
return await db.create_internal_committee_decision(
|
||||
case_number=kw["case_number"].strip(),
|
||||
case_name=kw["case_name"],
|
||||
full_text=kw["full_text"],
|
||||
court=(kw.get("court") or "").strip(),
|
||||
decision_date=kw.get("decision_date"),
|
||||
chair_name=(kw.get("chair_name") or "").strip(),
|
||||
district=kw.get("district", ""),
|
||||
practice_area=kw.get("practice_area", ""),
|
||||
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
||||
subject_tags=list(kw.get("subject_tags") or []),
|
||||
summary=(kw.get("summary") or "").strip(),
|
||||
is_binding=kw.get("is_binding", True),
|
||||
document_id=kw.get("document_id"),
|
||||
proceeding_type=kw.get("proceeding_type") or "ערר",
|
||||
)
|
||||
|
||||
|
||||
_INTERNAL_SPEC = ingest.IntakeSpec(
|
||||
source_kind="internal_committee",
|
||||
id_field="case_number",
|
||||
staging_root=INTERNAL_DECISIONS_DIR,
|
||||
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
|
||||
validate=_internal_validate,
|
||||
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
|
||||
derive=_internal_derive,
|
||||
display_name_fallback="case_number",
|
||||
create_record=_create_internal_record,
|
||||
)
|
||||
|
||||
|
||||
async def ingest_internal_decision(
|
||||
*,
|
||||
case_number: str,
|
||||
case_name: str = "",
|
||||
court: str = "",
|
||||
decision_date=None,
|
||||
chair_name: str = "",
|
||||
district: str = "",
|
||||
practice_area: str = "",
|
||||
appeal_subtype: str = "",
|
||||
subject_tags: list[str] | None = None,
|
||||
summary: str = "",
|
||||
is_binding: bool = True,
|
||||
file_path: str | Path | None = None,
|
||||
text: str | None = None,
|
||||
document_id: UUID | None = None,
|
||||
queue_halachot: bool = True, # retained for signature compat; pipeline always queues
|
||||
proceeding_type: str = "",
|
||||
) -> dict:
|
||||
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
|
||||
inputs = {
|
||||
"case_number": case_number, "case_name": case_name, "court": court,
|
||||
"decision_date": decision_date, "chair_name": chair_name, "district": district,
|
||||
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
|
||||
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
|
||||
"proceeding_type": proceeding_type,
|
||||
}
|
||||
out = await ingest.ingest_document(
|
||||
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
|
||||
document_id=document_id,
|
||||
)
|
||||
return {"status": out["status"], "case_law_id": out["case_law_id"],
|
||||
"chunks": out["chunks"], "halachot_pending": True}
|
||||
```
|
||||
|
||||
> `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline
|
||||
> always queues both (per INV-ING3). Confirm with the user during execution that bulk
|
||||
> re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this
|
||||
> wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop.
|
||||
|
||||
- [ ] **Step 2: Run the full test file**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v`
|
||||
Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02).
|
||||
|
||||
- [ ] **Step 3: Commit**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add mcp-server/src/legal_mcp/services/internal_decisions.py
|
||||
git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6: Dead-code sweep, smoke import, full suite
|
||||
|
||||
**Files:**
|
||||
- Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py`
|
||||
|
||||
- [ ] **Step 1: Confirm no orphaned references to removed helpers**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py`
|
||||
Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete.
|
||||
|
||||
- [ ] **Step 2: Smoke-import every affected module + its callers**
|
||||
|
||||
Run:
|
||||
```bash
|
||||
cd ~/legal-ai/mcp-server && .venv/bin/python -c "
|
||||
from legal_mcp.services import ingest, precedent_library, internal_decisions
|
||||
from legal_mcp.tools import precedent_library as t1, internal_decisions as t2
|
||||
import inspect
|
||||
sig_p = inspect.signature(precedent_library.ingest_precedent)
|
||||
sig_i = inspect.signature(internal_decisions.ingest_internal_decision)
|
||||
assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters
|
||||
assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters
|
||||
print('signatures preserved; imports clean')
|
||||
"
|
||||
```
|
||||
Expected: prints `signatures preserved; imports clean`.
|
||||
|
||||
- [ ] **Step 3: Run the entire test suite (no regressions elsewhere)**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q`
|
||||
Expected: all pre-existing tests still pass + the 9 new ones.
|
||||
|
||||
- [ ] **Step 4: Lint the changed files (match repo style)**
|
||||
|
||||
Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"`
|
||||
Expected: clean, or "skip".
|
||||
|
||||
- [ ] **Step 5: Update TaskMaster #59 → done**
|
||||
|
||||
Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task).
|
||||
|
||||
- [ ] **Step 6: Final commit**
|
||||
|
||||
```bash
|
||||
cd ~/legal-ai
|
||||
git add -A mcp-server/
|
||||
git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review Notes
|
||||
|
||||
- **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5.
|
||||
- **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration.
|
||||
- **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior.
|
||||
- **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container.
|
||||
150
docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md
Normal file
150
docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md
Normal file
@@ -0,0 +1,150 @@
|
||||
# FU-1 — איחוד מסלול-הקליטה (Unified Ingest Path) — עיצוב
|
||||
|
||||
**סטטוס:** מאושר-לעיצוב · **תאריך:** 2026-05-30 · **ענף:** TBD (ייפתח בביצוע)
|
||||
**מכסה:** GAP-01, GAP-02, GAP-04, GAP-05 · **מספק:** INV-ING1, INV-ING3, INV-G2, INV-G4
|
||||
**מקורות:** [docs/spec/01-ingest.md](../../spec/01-ingest.md), [docs/spec/gap-audit.md](../../spec/gap-audit.md) · **משימה:** TaskMaster #59 (legal-ai)
|
||||
**סוג-עבודה:** pure-code · **מיגרציה:** אין (אומת מול DB 2026-05-30 — ראה §6)
|
||||
|
||||
---
|
||||
|
||||
## 1. הבעיה
|
||||
|
||||
שתי פונקציות-קליטה מקבילות לישויות-אחיות, שמשכפלות את צעדי 2–10 של הפייפליין ומתפצלות
|
||||
בפרטים:
|
||||
|
||||
- `services/precedent_library.py::ingest_precedent` (פסיקה חיצונית, `source_kind='external_upload'`)
|
||||
- `services/internal_decisions.py::ingest_internal_decision` (החלטות-ועדה, `source_kind='internal_committee'`)
|
||||
|
||||
מסלולים מקבילים גוררים drift — צעד שקיים באחד וחסר באחר. הביטוי הקונקרטי: **GAP-02** —
|
||||
המסלול הפנימי מתזמן רק `request_halacha_extraction` ולא `request_metadata_extraction`,
|
||||
ולכן החלטות-ועדה נקלטו בלי metadata. שש אסימטריות נוספות (GAP-01/04/05) מתועדות ב-01-ingest §4.
|
||||
|
||||
## 2. ההכרעה האדריכלית (מאומתת)
|
||||
|
||||
**Template Method skeleton + Strategy via config object.** פונקציה קנונית אחת מריצה את
|
||||
שלד-הפייפליין (סדר-צעדים אכיף); כל מה שמשתנה לפי סוג נישא ב-config object מוזרק (`IntakeSpec`).
|
||||
שתי הפונקציות הציבוריות נשמרות כ-API בעל-שם ומאצילות לליבה.
|
||||
|
||||
| החלטה | נימוק | מקורות (≥3) |
|
||||
|-------|--------|-------------|
|
||||
| מסלול קנוני יחיד (לא 2 מקבילים, לא ליבה-משותפת בין 2 כניסות) | Template Method: "אלגוריתמים כמעט-זהים עם הבדלים קטנים" → שלד אחד אוכף סדר-צעדים, צעד-חסר נעשה בלתי-אפשרי | refactoring.guru (Template Method); SourceMaking (Strategy); ADF parameterized-pipelines |
|
||||
| שמירת `ingest_precedent`/`ingest_internal_decision` כ-API ציבורי | Fowler FlagArgument: "separate methods communicate more clearly"; ליבה משותפת מוסתרת כשהלוגיקה שזורה | martinfowler.com/bliki/FlagArgument; ardalis; luzkan smells |
|
||||
| ווריאציה ב-config object (`IntakeSpec`), לא boolean-flags | flag-argument הוא code smell; config object נותן בהירות + הרחבה | Fowler; ardalis; dev.to flag-anti-pattern |
|
||||
| `validate` כ-callable, `enum_fields` כ-data | callable להטרוגני (Strategy idiomatic ב-Python first-class), data להומוגני ("אל תכפה הכל ל-strategy") | Strategy/Wikipedia; Functional-Strategy dev.to; Strategy-in-Python Medium |
|
||||
| `create_record` כ-callable מוזרק, לא `if source_kind` | Replace Conditional with Polymorphism + factory injection ("tell, don't ask") | refactoring.guru (Replace-Conditional); code-maze (Factory+DI); c-sharpcorner |
|
||||
|
||||
## 3. מבנה מודולים
|
||||
|
||||
**מודול חדש:** `mcp-server/src/legal_mcp/services/ingest.py`
|
||||
|
||||
```
|
||||
services/ingest.py ← חדש (בית המסלול הקנוני)
|
||||
├── IntakeSpec (frozen dataclass — מתאר-הסוג)
|
||||
├── async ingest_document(spec, *, file_path|text, inputs, progress) ← ליבה: צעדים 1–10
|
||||
├── _stage_file(src, root, subdir) (אחיד — מאוחד משני הקבצים)
|
||||
├── _coerce_date / _safe_filename (אחיד — היום משוכפל)
|
||||
└── _embed_pages(case_law_id, pdf, n) (עובר מ-precedent_library.py — צעד 7 אחיד)
|
||||
```
|
||||
|
||||
**API ציבורי — חתימה ללא שינוי לקוראים:**
|
||||
- `precedent_library.py::ingest_precedent(...)` → בונה `_EXTERNAL_SPEC`, קורא `ingest.ingest_document(...)`.
|
||||
- `internal_decisions.py::ingest_internal_decision(...)` → בונה `_INTERNAL_SPEC`, קורא `ingest.ingest_document(...)`.
|
||||
|
||||
**לא זז (גבול FU-2):** `db.create_external_case_law` / `db.create_internal_committee_decision`
|
||||
נשארות נפרדות; מנותבות דרך `IntakeSpec.create_record`. כל שאר הפונקציות בשני קבצי-השירות
|
||||
(search_*, migrate_*, reextract_*, process_pending_extractions, enrich_*) **לא נוגעים בהן**.
|
||||
|
||||
**הקוראים שלא משתנים:** MCP tools (`tools/precedent_library.py`, `tools/internal_decisions.py`)
|
||||
וה-HTTP API ב-`web/` ממשיכים לקרוא לאותן שתי פונקציות ציבוריות.
|
||||
|
||||
## 4. ה-IntakeSpec
|
||||
|
||||
```python
|
||||
@dataclass(frozen=True)
|
||||
class IntakeSpec:
|
||||
source_kind: str # 'external_upload' | 'internal_committee'
|
||||
id_field: str # 'citation' | 'case_number' (לוג/שגיאות)
|
||||
staging_root: Path # PRECEDENT_LIBRARY_DIR | INTERNAL_DECISIONS_DIR
|
||||
staging_subdir: Callable[[dict], str] # inputs → subdir (source_type | district | 'other')
|
||||
validate: Callable[[dict], None] # מרים ValueError (citation-guard / chair_name-חובה)
|
||||
enum_fields: dict[str, frozenset[str]] # נאכף לשני הסוגים (GAP-04)
|
||||
derive: Callable[[dict], dict] # שדות-נגזרים (district, proceeding_type); identity לחיצוני
|
||||
display_name_fallback: str # שם-השדה כשחסר case_name ('citation'|'case_number')
|
||||
create_record: Callable[..., Awaitable[dict]] # create_external_case_law | create_internal_committee_decision
|
||||
```
|
||||
|
||||
הליבה `ingest_document` **לא יודעת** איזה סוג רץ — רק מפעילה את ה-hooks בנקודות מוגדרות.
|
||||
|
||||
## 5. הפייפליין הקנוני (צעדים 1–10, לפי 01-ingest §2)
|
||||
|
||||
סדר-הביצוע בפועל (ה-DB-create מוקדם — נדרש `case_law_id` לפני אחסון chunks; תואם את הקוד הקיים):
|
||||
|
||||
| # | צעד | אחיד? | מקור-וריאציה |
|
||||
|---|------|-------|---------------|
|
||||
| 1 | ולידציית-קלט + enums | מנגנון אחיד | `spec.validate` + `spec.enum_fields` |
|
||||
| 2 | גזירת-שדות | מנגנון אחיד | `spec.derive` (identity לחיצוני) |
|
||||
| 3 | Stage file | מנגנון אחיד | `spec.staging_root` + `spec.staging_subdir` |
|
||||
| 4 | Extract text (טקסט-ריק = כשל מדווח) | ✅ מלא | — (internal גם מקבל `text` ישיר, בלי קובץ) |
|
||||
| 5 | Strip Nevo preamble | ✅ מלא | — |
|
||||
| 6 | **DB create → `case_law_id`** (ספציפי-לסוג) | מנותב | `spec.create_record` (+ `display_name_fallback`) |
|
||||
| 7 | Chunk (hierarchical/flat לפי `PARENT_DOC_RETRIEVAL_ENABLED`) | ✅ מלא | — (flag, לא סוג) |
|
||||
| 8 | Embed children + Store chunks | ✅ מלא | — |
|
||||
| 9 | **Multimodal page-image embed** (flag+PDF+page_count>0) | ✅ מלא | — (**GAP-05 fix**: היה רק בחיצוני) |
|
||||
| 10 | **Queue metadata extraction** | ✅ מלא | — (**GAP-02 fix**: היה רק בחיצוני) |
|
||||
| 11 | Queue halacha extraction | ✅ מלא | — |
|
||||
| 12 | Set statuses (extraction=completed, halacha=pending) | ✅ מלא | — |
|
||||
|
||||
> הערה: 01-ingest §2 ממספר 1–10 בלי למנות מפורשות את ה-DB-create; כאן הוא צעד 6 כי הוא קודם
|
||||
> ל-chunking בקוד בפועל. שגיאת-עיבוד אחרי create → `extraction_status=failed` (כמו היום).
|
||||
|
||||
**אילוץ `claude_session`:** הליבה רק **מתזמנת** (`request_*_extraction` — כתיבת-DB טהורה).
|
||||
אין import של `halacha_extractor`/`precedent_metadata_extractor` במסלול-הקליטה — נשמר כפי שהיום.
|
||||
|
||||
## 6. שינויי-התנהגות וסיכון
|
||||
|
||||
| שינוי | השפעה | סיכון |
|
||||
|--------|--------|--------|
|
||||
| **GAP-02**: internal עכשיו מתזמן metadata | החלטות-ועדה חדשות יקבלו headnote/summary/tags | אין — תיקון; רשומות קיימות כבר מלאות (0/56 חסר) |
|
||||
| **GAP-04**: ולידציית-enums על internal | קלט עם practice_area לא-חוקי יידחה בקליטה | נמוך — כל 56 הקיימות חוקיות; בודקים שקוראי-internal מעבירים ערכים חוקיים |
|
||||
| **GAP-05 multimodal**: internal PDF עכשיו מטמיע עמודים | החלטות-ועדה חדשות PDF יקבלו page-images | אין (non-fatal); קיימות → backfill ב-**TaskMaster 61.2 (FU-3)** |
|
||||
| **GAP-05 fallback/staging/derive/guard**: מאוחדים | התנהגות זהה, מסלול אחד | אין — citation-guard נשמר ב-`_EXTERNAL_SPEC.validate` |
|
||||
|
||||
**אין מיגרציה (אומת מול DB 2026-05-30):** internal_committee = 56 רשומות; metadata חסר = **0**;
|
||||
enums לא-חוקיים = **0**; multimodal: 14/56 יש (42 חסר → FU-3 #61.2). הריפקטור משנה רק התנהגות
|
||||
*קדימה*; אינו נוגע בנתונים שמורים.
|
||||
|
||||
**Drift מתועד (זניח, מכוון — מסקירת-קוד סופית):**
|
||||
- **empty-chunks early-return:** כשה-chunker מחזיר ריק על טקסט לא-ריק (נדיר), המקור הציב
|
||||
`halacha_status=completed` ויצא בלי לתזמן; הקנוני נופל הלאה ומתזמן את שני החילוצים עם
|
||||
`halacha_status=pending`. עקבי עם INV-ING3 (תיזמון אחיד) — שיפור, לא רגרסיה.
|
||||
- **thumbnails של multimodal** להחלטות-ועדה יושבים תחת `precedent-library/thumbnails/`
|
||||
(ממופתח לפי `case_law_id`) — מכוון, מתועד ב-docstring של `spec_thumb_dir`.
|
||||
- **`queue_halachot`** הוסר כליל (wrapper + `migrate_from_style_corpus`) — הדגל איבד משמעות
|
||||
תחת INV-ING3; אומת שאין caller שמעביר אותו.
|
||||
|
||||
## 7. אסטרטגיית בדיקה
|
||||
|
||||
pytest offline עם monkeypatch לכל גבולות-ה-I/O (db, embeddings, chunker, extractor) — כתבנית
|
||||
[tests/test_search_domain_scope.py](../../../mcp-server/tests/test_search_domain_scope.py)
|
||||
ו-[tests/test_precedent_corpus_isolation.py](../../../mcp-server/tests/test_precedent_corpus_isolation.py).
|
||||
קובץ חדש: `mcp-server/tests/test_unified_ingest.py`. רץ עם `.venv` המקומי.
|
||||
|
||||
מקרי-בדיקה (TDD — נכשלים לפני, עוברים אחרי):
|
||||
1. **regression GAP-02** — `ingest_internal_decision` מתזמן גם metadata **וגם** halacha (לוכד את הבאג המקורי).
|
||||
2. שני הסוגים זורמים דרך `ingest.ingest_document` (לא דרך גוף-קוד נפרד).
|
||||
3. ולידציית-enum דוחה `practice_area` לא-חוקי בשני הסוגים (GAP-04).
|
||||
4. citation-guard עדיין חוסם ציטוט `ערר`/`בל"מ` במסלול החיצוני.
|
||||
5. staging-subdir נפתר נכון (source_type לחיצוני, district לפנימי, 'other' ל-fallback).
|
||||
6. מסלול-`text` (פנימי, בלי קובץ) ומסלול-`file_path` שניהם עובדים.
|
||||
7. multimodal מותנה flag+PDF+page_count — **לא** בסוג-ה-intake; PDF פנימי → מטמיע, text → לא.
|
||||
8. fallback לשם-תצוגה: חסר case_name → נופל למזהה הקנוני הנכון לכל סוג.
|
||||
9. אידמפוטנטיות-חתימה: ערכי-החזרה של שתי הפונקציות הציבוריות נשמרים (תאימות-קוראים).
|
||||
|
||||
## 8. סדר-ביצוע
|
||||
|
||||
1. כתיבת `test_unified_ingest.py` (אדום).
|
||||
2. `services/ingest.py` — `IntakeSpec` + `ingest_document` + הזזת `_embed_pages` + helpers אחידים.
|
||||
3. `_EXTERNAL_SPEC` + צמצום `ingest_precedent` ל-wrapper.
|
||||
4. `_INTERNAL_SPEC` + צמצום `ingest_internal_decision` ל-wrapper.
|
||||
5. הרצת הבדיקות (ירוק) + lint.
|
||||
6. בדיקת-עשן: import של שני קבצי-השירות + ה-MCP tools (ללא שבירת חתימות).
|
||||
257
mcp-server/src/legal_mcp/services/ingest.py
Normal file
257
mcp-server/src/legal_mcp/services/ingest.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Canonical ingest pipeline (FU-1).
|
||||
|
||||
One pipeline for all sibling-entity intake types (external precedent,
|
||||
internal committee decision). Per-type variation rides on an ``IntakeSpec``
|
||||
config object — never a parallel function. See
|
||||
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
|
||||
|
||||
claude_session rule preserved: this module only QUEUES extraction
|
||||
(``request_*_extraction`` = pure DB writes). It never imports
|
||||
halacha_extractor / precedent_metadata_extractor, so it is safe to call
|
||||
from the FastAPI container where the ``claude`` CLI is unavailable.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Awaitable, Callable
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import chunker, db, embeddings, extractor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ProgressCb = Callable[[str, int, str], Awaitable[None]]
|
||||
|
||||
|
||||
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IntakeSpec:
|
||||
"""Describes everything that varies between intake types."""
|
||||
source_kind: str
|
||||
id_field: str
|
||||
staging_root: Path
|
||||
staging_subdir: Callable[[dict], str]
|
||||
validate: Callable[[dict], None]
|
||||
enum_fields: dict[str, frozenset[str]]
|
||||
derive: Callable[[dict], dict]
|
||||
display_name_fallback: str
|
||||
create_record: Callable[..., Awaitable[dict]]
|
||||
|
||||
|
||||
def _coerce_date(value) -> date | None:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return date.fromisoformat(value[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _safe_filename(name: str) -> str:
|
||||
base = Path(name).name
|
||||
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
||||
|
||||
|
||||
def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
|
||||
dest_dir = root / (subdir or "other")
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
|
||||
shutil.copy2(src_path, dest)
|
||||
return dest
|
||||
|
||||
|
||||
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
|
||||
for field_name, allowed in spec.enum_fields.items():
|
||||
value = inputs.get(field_name, "") or ""
|
||||
if value not in allowed:
|
||||
raise ValueError(f"invalid {field_name}: {value!r}")
|
||||
|
||||
|
||||
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
|
||||
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
|
||||
thumb_dir = spec_thumb_dir(case_law_id)
|
||||
rendered = await asyncio.to_thread(
|
||||
extractor.render_pages_for_multimodal,
|
||||
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
|
||||
)
|
||||
images = [pil for pil, _ in rendered]
|
||||
thumbs = [t for _, t in rendered]
|
||||
img_embs = await embeddings.embed_images(images)
|
||||
|
||||
page_records = []
|
||||
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
|
||||
rel_thumb = None
|
||||
if thumb is not None:
|
||||
try:
|
||||
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
||||
except ValueError:
|
||||
rel_thumb = str(thumb)
|
||||
page_records.append({
|
||||
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
|
||||
})
|
||||
stored = await db.store_precedent_image_embeddings(
|
||||
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
|
||||
)
|
||||
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
|
||||
return {"pages_embedded": stored}
|
||||
|
||||
|
||||
def spec_thumb_dir(case_law_id: UUID) -> Path:
|
||||
"""Thumbnails live under the precedent-library tree regardless of intake type."""
|
||||
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
|
||||
|
||||
|
||||
async def ingest_document(
|
||||
spec: IntakeSpec,
|
||||
*,
|
||||
inputs: dict,
|
||||
file_path: str | Path | None = None,
|
||||
text: str | None = None,
|
||||
document_id: UUID | None = None,
|
||||
progress: ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Run the canonical 12-step pipeline for one intake item.
|
||||
|
||||
``inputs`` carries the type-specific record fields (citation/case_number,
|
||||
case_name, court, practice_area, etc.). ``spec`` decides how they are
|
||||
validated, staged, derived, and which DB-create runs. Returns a dict with
|
||||
at least: status, case_law_id, chunks.
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
|
||||
# Step 1: input validation (type-specific) + enums (uniform mechanism).
|
||||
if not file_path and text is None:
|
||||
raise ValueError("either file_path or text is required")
|
||||
spec.validate(inputs)
|
||||
_validate_enums(spec, inputs)
|
||||
|
||||
# Step 2: field derivation (identity for external).
|
||||
inputs = {**inputs, **spec.derive(inputs)}
|
||||
|
||||
# Steps 3-5: stage (if file) + extract + strip.
|
||||
page_count = 0
|
||||
page_offsets = None
|
||||
staged: Path | None = None
|
||||
if file_path:
|
||||
src = Path(file_path)
|
||||
if not src.is_file():
|
||||
raise FileNotFoundError(f"file not found: {src}")
|
||||
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
||||
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
|
||||
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
||||
try:
|
||||
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
||||
except Exception as e:
|
||||
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
|
||||
raise
|
||||
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
|
||||
else:
|
||||
raw_text = (text or "").strip()
|
||||
if not raw_text:
|
||||
await progress("failed", 100, "לא נמצא טקסט בקובץ")
|
||||
raise ValueError("no extractable text in file")
|
||||
|
||||
# Step 6: DB create (type-specific, routed — get case_law_id).
|
||||
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
|
||||
display_name = (inputs.get("case_name") or "").strip() or (
|
||||
inputs.get(spec.display_name_fallback) or ""
|
||||
).strip()
|
||||
record = await spec.create_record(
|
||||
full_text=raw_text,
|
||||
case_name=display_name,
|
||||
decision_date=_coerce_date(inputs.get("decision_date")),
|
||||
document_id=document_id,
|
||||
**{k: v for k, v in inputs.items()
|
||||
if k not in {"case_name", "decision_date", "file_path", "text"}},
|
||||
)
|
||||
case_law_id = UUID(str(record["id"]))
|
||||
|
||||
try:
|
||||
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
|
||||
|
||||
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
|
||||
if (config.MULTIMODAL_ENABLED and page_count > 0
|
||||
and staged is not None and staged.suffix.lower() == ".pdf"):
|
||||
try:
|
||||
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
|
||||
await _embed_pages(case_law_id, staged, page_count)
|
||||
except Exception as e:
|
||||
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
|
||||
|
||||
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "pending")
|
||||
await db.request_metadata_extraction(case_law_id)
|
||||
await db.request_halacha_extraction(case_law_id)
|
||||
|
||||
await progress("completed", 100,
|
||||
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": stored_chunks,
|
||||
"halachot": 0,
|
||||
"halachot_pending": True,
|
||||
"metadata_filled": [],
|
||||
"pages": page_count,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
|
||||
await db.set_case_law_extraction_status(case_law_id, "failed")
|
||||
await progress("failed", 100, f"כשל בעיבוד: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
|
||||
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
|
||||
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
||||
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
|
||||
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
|
||||
if not h_chunks:
|
||||
return 0
|
||||
children = [c for c in h_chunks if c.role == "child"]
|
||||
parents = [c for c in h_chunks if c.role == "parent"]
|
||||
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
|
||||
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
|
||||
chunk_dicts: list[dict] = []
|
||||
for p in parents:
|
||||
chunk_dicts.append({
|
||||
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
|
||||
"chunk_index": p.chunk_index, "content": p.content,
|
||||
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
|
||||
})
|
||||
for c, v in zip(children, child_vectors):
|
||||
chunk_dicts.append({
|
||||
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
|
||||
"chunk_index": c.chunk_index, "content": c.content,
|
||||
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
|
||||
})
|
||||
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
|
||||
return counts["children"]
|
||||
else:
|
||||
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
|
||||
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
||||
if not chunks:
|
||||
return 0
|
||||
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
|
||||
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
|
||||
chunk_dicts = [
|
||||
{"chunk_index": c.chunk_index, "content": c.content,
|
||||
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
|
||||
for c, v in zip(chunks, chunk_vectors)
|
||||
]
|
||||
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
||||
@@ -16,21 +16,19 @@ Judicial decisions (Supreme Court, Administrative Court) stay in external_upload
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from uuid import UUID, uuid4
|
||||
from uuid import UUID
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import chunker, db, embeddings, extractor
|
||||
from legal_mcp.services import db, embeddings, ingest
|
||||
from legal_mcp.services.practice_area import derive_proceeding_type
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
|
||||
|
||||
_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}
|
||||
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
||||
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
|
||||
|
||||
_COURT_TO_DISTRICT = [
|
||||
("ירושלים", "ירושלים"),
|
||||
@@ -45,24 +43,6 @@ _COURT_TO_DISTRICT = [
|
||||
]
|
||||
|
||||
|
||||
def _coerce_date(value) -> date | None:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return date.fromisoformat(value[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _safe_filename(name: str) -> str:
|
||||
base = Path(name).name
|
||||
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}"
|
||||
|
||||
|
||||
def _district_from_court(court: str) -> str:
|
||||
for keyword, district in _COURT_TO_DISTRICT:
|
||||
if keyword in court:
|
||||
@@ -70,6 +50,51 @@ def _district_from_court(court: str) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def _internal_validate(inputs: dict) -> None:
|
||||
if not (inputs.get("case_number") or "").strip():
|
||||
raise ValueError("case_number is required")
|
||||
|
||||
|
||||
def _internal_derive(inputs: dict) -> dict:
|
||||
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
|
||||
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
|
||||
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
|
||||
)
|
||||
return {"district": district, "proceeding_type": proc}
|
||||
|
||||
|
||||
async def _create_internal_record(**kw) -> dict:
|
||||
return await db.create_internal_committee_decision(
|
||||
case_number=kw["case_number"].strip(),
|
||||
case_name=kw["case_name"],
|
||||
full_text=kw["full_text"],
|
||||
court=(kw.get("court") or "").strip(),
|
||||
decision_date=kw.get("decision_date"),
|
||||
chair_name=(kw.get("chair_name") or "").strip(),
|
||||
district=kw.get("district", ""),
|
||||
practice_area=kw.get("practice_area", ""),
|
||||
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
||||
subject_tags=list(kw.get("subject_tags") or []),
|
||||
summary=(kw.get("summary") or "").strip(),
|
||||
is_binding=kw.get("is_binding", True),
|
||||
document_id=kw.get("document_id"),
|
||||
proceeding_type=kw.get("proceeding_type") or "ערר",
|
||||
)
|
||||
|
||||
|
||||
_INTERNAL_SPEC = ingest.IntakeSpec(
|
||||
source_kind="internal_committee",
|
||||
id_field="case_number",
|
||||
staging_root=INTERNAL_DECISIONS_DIR,
|
||||
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
|
||||
validate=_internal_validate,
|
||||
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
|
||||
derive=_internal_derive,
|
||||
display_name_fallback="case_number",
|
||||
create_record=_create_internal_record,
|
||||
)
|
||||
|
||||
|
||||
async def ingest_internal_decision(
|
||||
*,
|
||||
case_number: str,
|
||||
@@ -86,141 +111,25 @@ async def ingest_internal_decision(
|
||||
file_path: str | Path | None = None,
|
||||
text: str | None = None,
|
||||
document_id: UUID | None = None,
|
||||
queue_halachot: bool = True,
|
||||
proceeding_type: str = "",
|
||||
) -> dict:
|
||||
"""Ingest an appeals-committee decision into the internal corpus.
|
||||
|
||||
Either file_path or text must be provided.
|
||||
If district is empty, it is inferred from court.
|
||||
If proceeding_type is empty, it is derived from appeal_subtype/case_name.
|
||||
Returns: {"status": "completed", "case_law_id": "...", "chunks": N}
|
||||
"""
|
||||
if not file_path and not text:
|
||||
raise ValueError("either file_path or text is required")
|
||||
if not case_number.strip():
|
||||
raise ValueError("case_number is required")
|
||||
|
||||
resolved_district = district.strip() or _district_from_court(court)
|
||||
resolved_proc = proceeding_type.strip() or derive_proceeding_type(
|
||||
appeal_subtype=appeal_subtype, subject=case_name,
|
||||
)
|
||||
|
||||
if file_path:
|
||||
src = Path(file_path)
|
||||
if not src.is_file():
|
||||
raise FileNotFoundError(f"file not found: {src}")
|
||||
dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other")
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}"
|
||||
shutil.copy2(src, staged)
|
||||
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
||||
raw_text = extractor.strip_nevo_preamble(raw_text or "").strip()
|
||||
if not raw_text:
|
||||
raise ValueError("no extractable text in file")
|
||||
else:
|
||||
raw_text = (text or "").strip()
|
||||
if not raw_text:
|
||||
raise ValueError("text is empty")
|
||||
page_count = 0
|
||||
page_offsets = None
|
||||
|
||||
record = await db.create_internal_committee_decision(
|
||||
case_number=case_number.strip(),
|
||||
case_name=(case_name.strip() or case_number.strip()),
|
||||
full_text=raw_text,
|
||||
court=court.strip(),
|
||||
decision_date=_coerce_date(decision_date),
|
||||
chair_name=chair_name.strip(),
|
||||
district=resolved_district,
|
||||
practice_area=practice_area,
|
||||
appeal_subtype=appeal_subtype.strip(),
|
||||
subject_tags=list(subject_tags or []),
|
||||
summary=summary.strip(),
|
||||
is_binding=is_binding,
|
||||
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
|
||||
inputs = {
|
||||
"case_number": case_number, "case_name": case_name, "court": court,
|
||||
"decision_date": decision_date, "chair_name": chair_name, "district": district,
|
||||
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
|
||||
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
|
||||
"proceeding_type": proceeding_type,
|
||||
}
|
||||
out = await ingest.ingest_document(
|
||||
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
|
||||
document_id=document_id,
|
||||
proceeding_type=resolved_proc,
|
||||
)
|
||||
case_law_id = UUID(str(record["id"]))
|
||||
|
||||
try:
|
||||
# Parent-doc retrieval (TaskMaster #48) — same gated branch as
|
||||
# ingest_precedent. Internal committee decisions are typically
|
||||
# longer than external court rulings (full transcript + ruling),
|
||||
# so the parent-doc benefit is even larger here.
|
||||
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
||||
h_chunks = chunker.chunk_document_hierarchical(
|
||||
raw_text, page_offsets=page_offsets,
|
||||
)
|
||||
if not h_chunks:
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
|
||||
children = [c for c in h_chunks if c.role == "child"]
|
||||
parents = [c for c in h_chunks if c.role == "parent"]
|
||||
child_vectors = await embeddings.embed_texts(
|
||||
[c.content for c in children], input_type="document",
|
||||
)
|
||||
chunk_dicts: list[dict] = []
|
||||
for p in parents:
|
||||
chunk_dicts.append({
|
||||
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
|
||||
"chunk_index": p.chunk_index, "content": p.content,
|
||||
"section_type": p.section_type, "page_number": p.page_number,
|
||||
"embedding": None,
|
||||
})
|
||||
for c, v in zip(children, child_vectors):
|
||||
chunk_dicts.append({
|
||||
"role": "child", "local_id": c.local_id,
|
||||
"parent_local_id": c.parent_local_id,
|
||||
"chunk_index": c.chunk_index, "content": c.content,
|
||||
"section_type": c.section_type, "page_number": c.page_number,
|
||||
"embedding": v,
|
||||
})
|
||||
counts = await db.store_precedent_chunks_hierarchical(
|
||||
case_law_id, chunk_dicts,
|
||||
)
|
||||
stored = counts["children"]
|
||||
else:
|
||||
chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets)
|
||||
if not chunks:
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
|
||||
|
||||
chunk_texts = [c.content for c in chunks]
|
||||
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
|
||||
chunk_dicts = [
|
||||
{
|
||||
"chunk_index": c.chunk_index,
|
||||
"content": c.content,
|
||||
"section_type": c.section_type,
|
||||
"page_number": c.page_number,
|
||||
"embedding": v,
|
||||
}
|
||||
for c, v in zip(chunks, chunk_vectors)
|
||||
]
|
||||
stored = await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
||||
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "pending")
|
||||
if queue_halachot:
|
||||
await db.request_halacha_extraction(case_law_id)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": stored,
|
||||
"halachot_pending": True,
|
||||
}
|
||||
|
||||
except Exception:
|
||||
logger.exception("ingest_internal_decision failed for %s", case_number)
|
||||
await db.set_case_law_extraction_status(case_law_id, "failed")
|
||||
raise
|
||||
return {"status": out["status"], "case_law_id": out["case_law_id"],
|
||||
"chunks": out["chunks"], "halachot_pending": True}
|
||||
|
||||
|
||||
async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict:
|
||||
async def migrate_from_style_corpus(dry_run: bool = False) -> dict:
|
||||
"""Re-index all style_corpus entries as searchable internal committee decisions.
|
||||
|
||||
Does NOT delete style_corpus rows — they remain for style analysis.
|
||||
@@ -278,7 +187,6 @@ async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool
|
||||
appeal_subtype=subtype,
|
||||
subject_tags=subject_tags,
|
||||
text=row["full_text"],
|
||||
queue_halachot=queue_halachot,
|
||||
)
|
||||
results["ingested"] += 1
|
||||
logger.info("Migrated style_corpus entry: %s", case_number)
|
||||
|
||||
@@ -15,15 +15,12 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Awaitable, Callable
|
||||
from uuid import UUID, uuid4
|
||||
from uuid import UUID
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401
|
||||
from legal_mcp.services import db, embeddings, hybrid_search, ingest # noqa: F401
|
||||
|
||||
# Note: halacha_extractor and precedent_metadata_extractor are NOT imported
|
||||
# at module load. They are imported lazily inside the dedicated re-extract
|
||||
@@ -40,8 +37,8 @@ ProgressCb = Callable[[str, int, str], Awaitable[None]]
|
||||
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
|
||||
|
||||
|
||||
_VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"}
|
||||
_VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"}
|
||||
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
|
||||
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
|
||||
_VALID_PRECEDENT_LEVELS = {
|
||||
"", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית",
|
||||
"supreme", "administrative", "national_appeals_committee", "district_appeals_committee",
|
||||
@@ -52,37 +49,54 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
def _safe_filename(name: str) -> str:
|
||||
"""Strip path separators and unsafe chars from a user-provided name."""
|
||||
base = Path(name).name
|
||||
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
|
||||
def _external_validate(inputs: dict) -> None:
|
||||
citation = (inputs.get("citation") or "").strip()
|
||||
if not citation:
|
||||
raise ValueError("citation is required")
|
||||
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
|
||||
raise ValueError(
|
||||
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
|
||||
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
|
||||
"לא ב-precedent_library_upload."
|
||||
)
|
||||
|
||||
|
||||
def _stage_file(src_path: Path, source_type: str) -> Path:
|
||||
"""Copy the uploaded file into data/precedent-library/<source_type>/.
|
||||
|
||||
Returns the destination path. Source file is not deleted (caller decides).
|
||||
"""
|
||||
sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other"
|
||||
dest_dir = PRECEDENT_LIBRARY_DIR / sub
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
safe_name = _safe_filename(src_path.name)
|
||||
dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}"
|
||||
shutil.copy2(src_path, dest)
|
||||
return dest
|
||||
def _external_staging_subdir(inputs: dict) -> str:
|
||||
st = inputs.get("source_type") or ""
|
||||
return st if st in {"court_ruling", "appeals_committee"} else "other"
|
||||
|
||||
|
||||
def _coerce_date(value) -> date | None:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return date.fromisoformat(value[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
async def _create_external_record(**kw) -> dict:
|
||||
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
|
||||
return await db.create_external_case_law(
|
||||
case_number=kw["citation"].strip(),
|
||||
case_name=kw["case_name"],
|
||||
full_text=kw["full_text"],
|
||||
court=(kw.get("court") or "").strip(),
|
||||
decision_date=kw.get("decision_date"),
|
||||
practice_area=kw.get("practice_area", ""),
|
||||
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
|
||||
subject_tags=list(kw.get("subject_tags") or []),
|
||||
summary=(kw.get("summary") or "").strip(),
|
||||
headnote=(kw.get("headnote") or "").strip(),
|
||||
source_type=kw.get("source_type", ""),
|
||||
precedent_level=kw.get("precedent_level", ""),
|
||||
is_binding=kw.get("is_binding", True),
|
||||
document_id=kw.get("document_id"),
|
||||
)
|
||||
|
||||
|
||||
_EXTERNAL_SPEC = ingest.IntakeSpec(
|
||||
source_kind="external_upload",
|
||||
id_field="citation",
|
||||
staging_root=PRECEDENT_LIBRARY_DIR,
|
||||
staging_subdir=_external_staging_subdir,
|
||||
validate=_external_validate,
|
||||
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
|
||||
derive=lambda inputs: {},
|
||||
display_name_fallback="citation",
|
||||
create_record=_create_external_record,
|
||||
)
|
||||
|
||||
|
||||
async def ingest_precedent(
|
||||
@@ -101,220 +115,20 @@ async def ingest_precedent(
|
||||
headnote: str = "",
|
||||
summary: str = "",
|
||||
document_id: UUID | None = None,
|
||||
progress: ProgressCb | None = None,
|
||||
progress: ingest.ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Ingest a single uploaded precedent through the full pipeline.
|
||||
|
||||
Required: file_path + citation. Everything else has a sensible default.
|
||||
|
||||
Returns:
|
||||
``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}``
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
src = Path(file_path)
|
||||
if not src.is_file():
|
||||
raise FileNotFoundError(f"file not found: {src}")
|
||||
if not citation.strip():
|
||||
raise ValueError("citation is required")
|
||||
# Citation guard at service level (catches both MCP and HTTP API paths).
|
||||
# Appeals-committee decisions must go through ingest_internal_decision
|
||||
# which records chair_name+district. The MCP wrapper has the same guard
|
||||
# for an earlier, friendlier error message — but this is the source of
|
||||
# truth. See TaskMaster #30(ב) and DB constraint case_law_external_arar_check.
|
||||
_norm = citation.strip()
|
||||
if _norm.startswith(("ערר ", "ערר(", "בל\"מ ", "בל\"מ(", "ARAR ")):
|
||||
raise ValueError(
|
||||
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
|
||||
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
|
||||
"לא ב-precedent_library_upload."
|
||||
)
|
||||
if practice_area not in _VALID_PRACTICE_AREAS:
|
||||
raise ValueError(f"invalid practice_area: {practice_area!r}")
|
||||
if source_type not in _VALID_SOURCE_TYPES:
|
||||
raise ValueError(f"invalid source_type: {source_type!r}")
|
||||
|
||||
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
|
||||
|
||||
staged = _stage_file(src, source_type)
|
||||
|
||||
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
|
||||
try:
|
||||
text, page_count, page_offsets = await extractor.extract_text(str(staged))
|
||||
except Exception as e:
|
||||
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
|
||||
raise
|
||||
|
||||
text = (text or "").strip()
|
||||
if not text:
|
||||
await progress("failed", 100, "לא נמצא טקסט בקובץ")
|
||||
raise ValueError("no extractable text in file")
|
||||
|
||||
# Strip any Nevo preamble that might wrap court rulings downloaded from Nevo.
|
||||
text = extractor.strip_nevo_preamble(text)
|
||||
|
||||
await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים")
|
||||
record = await db.create_external_case_law(
|
||||
case_number=citation.strip(),
|
||||
case_name=case_name.strip() or citation.strip(),
|
||||
full_text=text,
|
||||
court=court.strip(),
|
||||
decision_date=_coerce_date(decision_date),
|
||||
practice_area=practice_area,
|
||||
appeal_subtype=appeal_subtype.strip(),
|
||||
subject_tags=list(subject_tags or []),
|
||||
summary=summary.strip(),
|
||||
headnote=headnote.strip(),
|
||||
source_type=source_type,
|
||||
precedent_level=precedent_level,
|
||||
is_binding=is_binding,
|
||||
document_id=document_id,
|
||||
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
|
||||
inputs = {
|
||||
"citation": citation, "case_name": case_name, "court": court,
|
||||
"decision_date": decision_date, "source_type": source_type,
|
||||
"precedent_level": precedent_level, "practice_area": practice_area,
|
||||
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
|
||||
"is_binding": is_binding, "headnote": headnote, "summary": summary,
|
||||
}
|
||||
return await ingest.ingest_document(
|
||||
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
|
||||
document_id=document_id, progress=progress,
|
||||
)
|
||||
case_law_id = UUID(str(record["id"]))
|
||||
|
||||
try:
|
||||
# Parent-doc retrieval (TaskMaster #48): when enabled, emit
|
||||
# two tiers (parents + children). Only children are embedded
|
||||
# and indexed; parents carry retrieval context. When disabled,
|
||||
# fall back to legacy single-tier chunking — identical
|
||||
# behaviour to pre-V17.
|
||||
if config.PARENT_DOC_RETRIEVAL_ENABLED:
|
||||
await progress(
|
||||
"chunking", 40,
|
||||
f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')",
|
||||
)
|
||||
h_chunks = chunker.chunk_document_hierarchical(
|
||||
text, page_offsets=page_offsets,
|
||||
)
|
||||
if not h_chunks:
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||
await progress("completed", 100, "אין טקסט לעיבוד")
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": 0,
|
||||
"halachot": 0,
|
||||
}
|
||||
|
||||
children = [c for c in h_chunks if c.role == "child"]
|
||||
parents = [c for c in h_chunks if c.role == "parent"]
|
||||
await progress(
|
||||
"embedding", 55,
|
||||
f"מייצר embeddings ל-{len(children)} children "
|
||||
f"({len(parents)} parents)",
|
||||
)
|
||||
child_texts = [c.content for c in children]
|
||||
child_vectors = await embeddings.embed_texts(
|
||||
child_texts, input_type="document",
|
||||
)
|
||||
# Build flat dict list for the two-pass writer.
|
||||
chunk_dicts: list[dict] = []
|
||||
for p in parents:
|
||||
chunk_dicts.append({
|
||||
"role": "parent",
|
||||
"local_id": p.local_id,
|
||||
"parent_local_id": None,
|
||||
"chunk_index": p.chunk_index,
|
||||
"content": p.content,
|
||||
"section_type": p.section_type,
|
||||
"page_number": p.page_number,
|
||||
"embedding": None,
|
||||
})
|
||||
for c, v in zip(children, child_vectors):
|
||||
chunk_dicts.append({
|
||||
"role": "child",
|
||||
"local_id": c.local_id,
|
||||
"parent_local_id": c.parent_local_id,
|
||||
"chunk_index": c.chunk_index,
|
||||
"content": c.content,
|
||||
"section_type": c.section_type,
|
||||
"page_number": c.page_number,
|
||||
"embedding": v,
|
||||
})
|
||||
counts = await db.store_precedent_chunks_hierarchical(
|
||||
case_law_id, chunk_dicts,
|
||||
)
|
||||
stored_chunks = counts["children"]
|
||||
else:
|
||||
await progress(
|
||||
"chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')",
|
||||
)
|
||||
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
|
||||
if not chunks:
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||
await progress("completed", 100, "אין טקסט לעיבוד")
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": 0,
|
||||
"halachot": 0,
|
||||
}
|
||||
|
||||
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
|
||||
chunk_texts = [c.content for c in chunks]
|
||||
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
|
||||
|
||||
chunk_dicts = [
|
||||
{
|
||||
"chunk_index": c.chunk_index,
|
||||
"content": c.content,
|
||||
"section_type": c.section_type,
|
||||
"page_number": c.page_number,
|
||||
"embedding": v,
|
||||
}
|
||||
for c, v in zip(chunks, chunk_vectors)
|
||||
]
|
||||
stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts)
|
||||
|
||||
# Multimodal page-image embeddings (V9). Gated by feature flag.
|
||||
# Non-fatal: text path already succeeded. Only PDFs.
|
||||
if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf":
|
||||
try:
|
||||
await progress(
|
||||
"embedding_images", 70,
|
||||
f"מטמיע {page_count} עמודי תמונה (multimodal)",
|
||||
)
|
||||
await _embed_precedent_pages(case_law_id, staged, page_count)
|
||||
except Exception as e:
|
||||
logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e)
|
||||
|
||||
# Pipeline split: the container does the non-LLM half (extract +
|
||||
# chunk + embed + store). LLM-driven extraction (metadata, halachot)
|
||||
# runs separately via the MCP tool `precedent_process_pending` from
|
||||
# local Claude Code, where `claude` CLI is available.
|
||||
#
|
||||
# We auto-queue both extractions so the chair doesn't need to click
|
||||
# any button — the moment they (or me) run `precedent_process_pending`
|
||||
# in chat, both kinds get processed.
|
||||
await db.set_case_law_extraction_status(case_law_id, "completed")
|
||||
await db.set_case_law_halacha_status(case_law_id, "pending")
|
||||
await db.request_metadata_extraction(case_law_id)
|
||||
await db.request_halacha_extraction(case_law_id)
|
||||
|
||||
await progress(
|
||||
"completed",
|
||||
100,
|
||||
f"הוכנס לספרייה: {stored_chunks} chunks. "
|
||||
f"חילוץ הלכות ומטא-דאטה ממתינים בתור — "
|
||||
f"להפעיל מ-Claude Code: precedent_process_pending.",
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"case_law_id": str(case_law_id),
|
||||
"chunks": stored_chunks,
|
||||
"halachot": 0,
|
||||
"halachot_pending": True,
|
||||
"metadata_filled": [],
|
||||
"pages": page_count,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("precedent_library.ingest_precedent failed: %s", e)
|
||||
await db.set_case_law_extraction_status(case_law_id, "failed")
|
||||
await progress("failed", 100, f"כשל בעיבוד: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def reextract_halachot(
|
||||
@@ -586,48 +400,3 @@ async def search_library(
|
||||
subject_tag=subject_tag,
|
||||
include_halachot=include_halachot,
|
||||
)
|
||||
|
||||
|
||||
async def _embed_precedent_pages(
|
||||
case_law_id: UUID,
|
||||
pdf_path: Path,
|
||||
page_count: int,
|
||||
) -> dict:
|
||||
"""Render precedent PDF pages → embed via voyage-multimodal → store.
|
||||
|
||||
Thumbnails go to
|
||||
``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``.
|
||||
"""
|
||||
thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id)
|
||||
rendered = await asyncio.to_thread(
|
||||
extractor.render_pages_for_multimodal,
|
||||
pdf_path,
|
||||
config.MULTIMODAL_DPI,
|
||||
config.MULTIMODAL_THUMB_DPI,
|
||||
thumb_dir,
|
||||
)
|
||||
images = [pil for pil, _ in rendered]
|
||||
thumbs = [t for _, t in rendered]
|
||||
img_embs = await embeddings.embed_images(images)
|
||||
|
||||
page_records = []
|
||||
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
|
||||
rel_thumb = None
|
||||
if thumb is not None:
|
||||
try:
|
||||
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
|
||||
except ValueError:
|
||||
rel_thumb = str(thumb)
|
||||
page_records.append({
|
||||
"page_number": i + 1,
|
||||
"embedding": emb,
|
||||
"image_thumbnail_path": rel_thumb,
|
||||
})
|
||||
stored = await db.store_precedent_image_embeddings(
|
||||
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
|
||||
)
|
||||
logger.info(
|
||||
"Multimodal: stored %d page-image embeddings for case_law %s",
|
||||
stored, case_law_id,
|
||||
)
|
||||
return {"pages_embedded": stored}
|
||||
|
||||
169
mcp-server/tests/test_unified_ingest.py
Normal file
169
mcp-server/tests/test_unified_ingest.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched).
|
||||
|
||||
Proves both intake types flow through services.ingest.ingest_document and that
|
||||
the canonical pipeline is symmetric: BOTH metadata and halacha extraction are
|
||||
queued for BOTH types (GAP-02 regression), enum validation applies to both
|
||||
(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the
|
||||
external citation guard is preserved.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import db, embeddings, chunker, extractor
|
||||
from legal_mcp.services import ingest, precedent_library, internal_decisions
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
class _Chunk:
|
||||
def __init__(self, i):
|
||||
self.chunk_index = i
|
||||
self.content = f"chunk-{i}"
|
||||
self.section_type = "body"
|
||||
self.page_number = 1
|
||||
self.role = "child"
|
||||
self.local_id = f"c{i}"
|
||||
self.parent_local_id = None
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def patched(monkeypatch, tmp_path):
|
||||
"""Patch every I/O boundary. Record queue + create calls."""
|
||||
calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []}
|
||||
|
||||
async def _extract_text(path):
|
||||
return ("full decision text", 2, [0, 100])
|
||||
|
||||
def _strip(text):
|
||||
return text
|
||||
|
||||
def _chunk(text, page_offsets=None):
|
||||
return [_Chunk(0), _Chunk(1)]
|
||||
|
||||
async def _embed(texts, input_type="document"):
|
||||
return [[0.0] * 8 for _ in texts]
|
||||
|
||||
async def _store_chunks(cid, dicts):
|
||||
calls["chunks"].append((cid, len(dicts)))
|
||||
return len(dicts)
|
||||
|
||||
async def _create_external(**kw):
|
||||
calls["create"].append(("external", kw))
|
||||
return {"id": uuid4()}
|
||||
|
||||
async def _create_internal(**kw):
|
||||
calls["create"].append(("internal", kw))
|
||||
return {"id": uuid4()}
|
||||
|
||||
async def _req_meta(cid):
|
||||
calls["metadata"].append(cid)
|
||||
|
||||
async def _req_hal(cid):
|
||||
calls["halacha"].append(cid)
|
||||
|
||||
async def _set_status(cid, status):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(extractor, "extract_text", _extract_text)
|
||||
monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip)
|
||||
monkeypatch.setattr(chunker, "chunk_document", _chunk)
|
||||
monkeypatch.setattr(embeddings, "embed_texts", _embed)
|
||||
monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks)
|
||||
monkeypatch.setattr(db, "create_external_case_law", _create_external)
|
||||
monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal)
|
||||
monkeypatch.setattr(db, "request_metadata_extraction", _req_meta)
|
||||
monkeypatch.setattr(db, "request_halacha_extraction", _req_hal)
|
||||
monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status)
|
||||
monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status)
|
||||
# Force flat chunking + multimodal OFF unless a test flips it.
|
||||
monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False)
|
||||
monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False)
|
||||
return calls
|
||||
|
||||
|
||||
def _make_pdf(tmp_path) -> str:
|
||||
p = tmp_path / "decision.pdf"
|
||||
p.write_bytes(b"%PDF-1.4 fake")
|
||||
return str(p)
|
||||
|
||||
|
||||
def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path):
|
||||
"""GAP-02 regression: the internal path must queue metadata too."""
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="decision text", chair_name="דפנה תמיר",
|
||||
district="ירושלים", practice_area="betterment_levy",
|
||||
))
|
||||
assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)"
|
||||
assert len(patched["halacha"]) == 1
|
||||
|
||||
|
||||
def test_external_queues_both(patched, tmp_path):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20",
|
||||
practice_area="rishuy_uvniya", source_type="court_ruling",
|
||||
))
|
||||
assert len(patched["metadata"]) == 1
|
||||
assert len(patched["halacha"]) == 1
|
||||
|
||||
|
||||
def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch):
|
||||
seen = []
|
||||
real = ingest.ingest_document
|
||||
|
||||
async def _spy(spec, **kw):
|
||||
seen.append(spec.source_kind)
|
||||
return await real(spec, **kw)
|
||||
|
||||
monkeypatch.setattr(ingest, "ingest_document", _spy)
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy"))
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya"))
|
||||
assert seen == ["internal_committee", "external_upload"]
|
||||
|
||||
|
||||
def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path):
|
||||
"""GAP-04: internal path must validate enums like the external one."""
|
||||
with pytest.raises(ValueError, match="practice_area"):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="bogus"))
|
||||
|
||||
|
||||
def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path):
|
||||
with pytest.raises(ValueError, match="practice_area"):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus"))
|
||||
|
||||
|
||||
def test_external_citation_guard_still_blocks_arar(patched, tmp_path):
|
||||
with pytest.raises(ValueError, match="ערר"):
|
||||
_run(precedent_library.ingest_precedent(
|
||||
file_path=_make_pdf(tmp_path), citation="ערר 1234/24"))
|
||||
|
||||
|
||||
def test_internal_text_path_works_without_file(patched):
|
||||
out = _run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
||||
assert out["status"] == "completed"
|
||||
assert out["case_law_id"]
|
||||
|
||||
|
||||
def test_internal_requires_file_or_text(patched):
|
||||
with pytest.raises(ValueError, match="file_path or text"):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", chair_name="x", practice_area="betterment_levy"))
|
||||
|
||||
|
||||
def test_display_name_fallback_uses_canonical_id(patched, tmp_path):
|
||||
_run(internal_decisions.ingest_internal_decision(
|
||||
case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy"))
|
||||
kind, kw = patched["create"][0]
|
||||
assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id"
|
||||
Reference in New Issue
Block a user