From 357a5238c49af19918ab55df183805892a02e1ab Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:00:30 +0000 Subject: [PATCH 1/9] docs(spec): FU-1 unified-ingest design + FU-3 backfill task (#61.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Design for unifying the two parallel ingest paths (ingest_precedent / ingest_internal_decision) into one canonical pipeline parameterized by an IntakeSpec config object — Template Method skeleton + Strategy injection. Closes the GAP-02 root cause (missing metadata queue on internal path) by making a skipped step structurally impossible. Architecture choice verified against 3+ authoritative sources (refactoring.guru Template-Method/Replace-Conditional, Fowler FlagArgument, Strategy pattern). DB check (2026-05-30): no migration needed — 0/56 internal rows lack metadata, 0 invalid enums; multimodal backfill (42 rows) tracked as TaskMaster #61.2 / FU-3. Covers GAP-01/02/04/05 · provides INV-ING1/ING3/G2/G4 · TaskMaster #59. Co-Authored-By: Claude Opus 4.8 (1M context) --- .taskmaster/tasks/tasks.json | 12 ++ .../2026-05-30-fu1-unified-ingest-design.md | 141 ++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md diff --git a/.taskmaster/tasks/tasks.json b/.taskmaster/tasks/tasks.json index 96ba304..f03f2d0 100644 --- a/.taskmaster/tasks/tasks.json +++ b/.taskmaster/tasks/tasks.json @@ -2140,6 +2140,18 @@ "status": "pending", "testStrategy": "", "parentId": "61" + }, + { + "id": 2, + "title": "[backfill] multimodal page-images ל-42 החלטות-ועדה קיימות", + "description": "42/56 רשומות source_kind='internal_committee' נקלטו במסלול הישן בלי multimodal page-images (FU-1 מתקן רק קדימה). אחרי שמנגנון ה-re-index של FU-3 קיים — להריץ re-embed של עמודי-תמונה עליהן. ⚠️ קודם לכמת כמה מה-42 הן PDF-backed (לרשומות שנקלטו מ-text בלבד אין קובץ → אי-אפשר להטמיע עמודים). רק PDF-backed רלוונטיות.", + "dependencies": [ + 1 + ], + "details": "מקור: בדיקת DB 2026-05-30 (precedent_image_embeddings JOIN case_law). internal_committee: 14/56 עם page-images, 42 בלי. נגזר מ-GAP-02/FU-1 boundary discussion. לא פער-תקינות — שיפור multimodal coverage.", + "status": "pending", + "testStrategy": "", + "parentId": "61" } ], "updatedAt": "2026-05-30T17:37:34.741136+00:00" diff --git a/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md b/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md new file mode 100644 index 0000000..7ed9917 --- /dev/null +++ b/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md @@ -0,0 +1,141 @@ +# FU-1 — איחוד מסלול-הקליטה (Unified Ingest Path) — עיצוב + +**סטטוס:** מאושר-לעיצוב · **תאריך:** 2026-05-30 · **ענף:** TBD (ייפתח בביצוע) +**מכסה:** GAP-01, GAP-02, GAP-04, GAP-05 · **מספק:** INV-ING1, INV-ING3, INV-G2, INV-G4 +**מקורות:** [docs/spec/01-ingest.md](../../spec/01-ingest.md), [docs/spec/gap-audit.md](../../spec/gap-audit.md) · **משימה:** TaskMaster #59 (legal-ai) +**סוג-עבודה:** pure-code · **מיגרציה:** אין (אומת מול DB 2026-05-30 — ראה §6) + +--- + +## 1. הבעיה + +שתי פונקציות-קליטה מקבילות לישויות-אחיות, שמשכפלות את צעדי 2–10 של הפייפליין ומתפצלות +בפרטים: + +- `services/precedent_library.py::ingest_precedent` (פסיקה חיצונית, `source_kind='external_upload'`) +- `services/internal_decisions.py::ingest_internal_decision` (החלטות-ועדה, `source_kind='internal_committee'`) + +מסלולים מקבילים גוררים drift — צעד שקיים באחד וחסר באחר. הביטוי הקונקרטי: **GAP-02** — +המסלול הפנימי מתזמן רק `request_halacha_extraction` ולא `request_metadata_extraction`, +ולכן החלטות-ועדה נקלטו בלי metadata. שש אסימטריות נוספות (GAP-01/04/05) מתועדות ב-01-ingest §4. + +## 2. ההכרעה האדריכלית (מאומתת) + +**Template Method skeleton + Strategy via config object.** פונקציה קנונית אחת מריצה את +שלד-הפייפליין (סדר-צעדים אכיף); כל מה שמשתנה לפי סוג נישא ב-config object מוזרק (`IntakeSpec`). +שתי הפונקציות הציבוריות נשמרות כ-API בעל-שם ומאצילות לליבה. + +| החלטה | נימוק | מקורות (≥3) | +|-------|--------|-------------| +| מסלול קנוני יחיד (לא 2 מקבילים, לא ליבה-משותפת בין 2 כניסות) | Template Method: "אלגוריתמים כמעט-זהים עם הבדלים קטנים" → שלד אחד אוכף סדר-צעדים, צעד-חסר נעשה בלתי-אפשרי | refactoring.guru (Template Method); SourceMaking (Strategy); ADF parameterized-pipelines | +| שמירת `ingest_precedent`/`ingest_internal_decision` כ-API ציבורי | Fowler FlagArgument: "separate methods communicate more clearly"; ליבה משותפת מוסתרת כשהלוגיקה שזורה | martinfowler.com/bliki/FlagArgument; ardalis; luzkan smells | +| ווריאציה ב-config object (`IntakeSpec`), לא boolean-flags | flag-argument הוא code smell; config object נותן בהירות + הרחבה | Fowler; ardalis; dev.to flag-anti-pattern | +| `validate` כ-callable, `enum_fields` כ-data | callable להטרוגני (Strategy idiomatic ב-Python first-class), data להומוגני ("אל תכפה הכל ל-strategy") | Strategy/Wikipedia; Functional-Strategy dev.to; Strategy-in-Python Medium | +| `create_record` כ-callable מוזרק, לא `if source_kind` | Replace Conditional with Polymorphism + factory injection ("tell, don't ask") | refactoring.guru (Replace-Conditional); code-maze (Factory+DI); c-sharpcorner | + +## 3. מבנה מודולים + +**מודול חדש:** `mcp-server/src/legal_mcp/services/ingest.py` + +``` +services/ingest.py ← חדש (בית המסלול הקנוני) +├── IntakeSpec (frozen dataclass — מתאר-הסוג) +├── async ingest_document(spec, *, file_path|text, inputs, progress) ← ליבה: צעדים 1–10 +├── _stage_file(src, root, subdir) (אחיד — מאוחד משני הקבצים) +├── _coerce_date / _safe_filename (אחיד — היום משוכפל) +└── _embed_pages(case_law_id, pdf, n) (עובר מ-precedent_library.py — צעד 7 אחיד) +``` + +**API ציבורי — חתימה ללא שינוי לקוראים:** +- `precedent_library.py::ingest_precedent(...)` → בונה `_EXTERNAL_SPEC`, קורא `ingest.ingest_document(...)`. +- `internal_decisions.py::ingest_internal_decision(...)` → בונה `_INTERNAL_SPEC`, קורא `ingest.ingest_document(...)`. + +**לא זז (גבול FU-2):** `db.create_external_case_law` / `db.create_internal_committee_decision` +נשארות נפרדות; מנותבות דרך `IntakeSpec.create_record`. כל שאר הפונקציות בשני קבצי-השירות +(search_*, migrate_*, reextract_*, process_pending_extractions, enrich_*) **לא נוגעים בהן**. + +**הקוראים שלא משתנים:** MCP tools (`tools/precedent_library.py`, `tools/internal_decisions.py`) +וה-HTTP API ב-`web/` ממשיכים לקרוא לאותן שתי פונקציות ציבוריות. + +## 4. ה-IntakeSpec + +```python +@dataclass(frozen=True) +class IntakeSpec: + source_kind: str # 'external_upload' | 'internal_committee' + id_field: str # 'citation' | 'case_number' (לוג/שגיאות) + staging_root: Path # PRECEDENT_LIBRARY_DIR | INTERNAL_DECISIONS_DIR + staging_subdir: Callable[[dict], str] # inputs → subdir (source_type | district | 'other') + validate: Callable[[dict], None] # מרים ValueError (citation-guard / chair_name-חובה) + enum_fields: dict[str, frozenset[str]] # נאכף לשני הסוגים (GAP-04) + derive: Callable[[dict], dict] # שדות-נגזרים (district, proceeding_type); identity לחיצוני + display_name_fallback: str # שם-השדה כשחסר case_name ('citation'|'case_number') + create_record: Callable[..., Awaitable[dict]] # create_external_case_law | create_internal_committee_decision +``` + +הליבה `ingest_document` **לא יודעת** איזה סוג רץ — רק מפעילה את ה-hooks בנקודות מוגדרות. + +## 5. הפייפליין הקנוני (צעדים 1–10, לפי 01-ingest §2) + +סדר-הביצוע בפועל (ה-DB-create מוקדם — נדרש `case_law_id` לפני אחסון chunks; תואם את הקוד הקיים): + +| # | צעד | אחיד? | מקור-וריאציה | +|---|------|-------|---------------| +| 1 | ולידציית-קלט + enums | מנגנון אחיד | `spec.validate` + `spec.enum_fields` | +| 2 | גזירת-שדות | מנגנון אחיד | `spec.derive` (identity לחיצוני) | +| 3 | Stage file | מנגנון אחיד | `spec.staging_root` + `spec.staging_subdir` | +| 4 | Extract text (טקסט-ריק = כשל מדווח) | ✅ מלא | — (internal גם מקבל `text` ישיר, בלי קובץ) | +| 5 | Strip Nevo preamble | ✅ מלא | — | +| 6 | **DB create → `case_law_id`** (ספציפי-לסוג) | מנותב | `spec.create_record` (+ `display_name_fallback`) | +| 7 | Chunk (hierarchical/flat לפי `PARENT_DOC_RETRIEVAL_ENABLED`) | ✅ מלא | — (flag, לא סוג) | +| 8 | Embed children + Store chunks | ✅ מלא | — | +| 9 | **Multimodal page-image embed** (flag+PDF+page_count>0) | ✅ מלא | — (**GAP-05 fix**: היה רק בחיצוני) | +| 10 | **Queue metadata extraction** | ✅ מלא | — (**GAP-02 fix**: היה רק בחיצוני) | +| 11 | Queue halacha extraction | ✅ מלא | — | +| 12 | Set statuses (extraction=completed, halacha=pending) | ✅ מלא | — | + +> הערה: 01-ingest §2 ממספר 1–10 בלי למנות מפורשות את ה-DB-create; כאן הוא צעד 6 כי הוא קודם +> ל-chunking בקוד בפועל. שגיאת-עיבוד אחרי create → `extraction_status=failed` (כמו היום). + +**אילוץ `claude_session`:** הליבה רק **מתזמנת** (`request_*_extraction` — כתיבת-DB טהורה). +אין import של `halacha_extractor`/`precedent_metadata_extractor` במסלול-הקליטה — נשמר כפי שהיום. + +## 6. שינויי-התנהגות וסיכון + +| שינוי | השפעה | סיכון | +|--------|--------|--------| +| **GAP-02**: internal עכשיו מתזמן metadata | החלטות-ועדה חדשות יקבלו headnote/summary/tags | אין — תיקון; רשומות קיימות כבר מלאות (0/56 חסר) | +| **GAP-04**: ולידציית-enums על internal | קלט עם practice_area לא-חוקי יידחה בקליטה | נמוך — כל 56 הקיימות חוקיות; בודקים שקוראי-internal מעבירים ערכים חוקיים | +| **GAP-05 multimodal**: internal PDF עכשיו מטמיע עמודים | החלטות-ועדה חדשות PDF יקבלו page-images | אין (non-fatal); קיימות → backfill ב-**TaskMaster 61.2 (FU-3)** | +| **GAP-05 fallback/staging/derive/guard**: מאוחדים | התנהגות זהה, מסלול אחד | אין — citation-guard נשמר ב-`_EXTERNAL_SPEC.validate` | + +**אין מיגרציה (אומת מול DB 2026-05-30):** internal_committee = 56 רשומות; metadata חסר = **0**; +enums לא-חוקיים = **0**; multimodal: 14/56 יש (42 חסר → FU-3 #61.2). הריפקטור משנה רק התנהגות +*קדימה*; אינו נוגע בנתונים שמורים. + +## 7. אסטרטגיית בדיקה + +pytest offline עם monkeypatch לכל גבולות-ה-I/O (db, embeddings, chunker, extractor) — כתבנית +[tests/test_search_domain_scope.py](../../../mcp-server/tests/test_search_domain_scope.py) +ו-[tests/test_precedent_corpus_isolation.py](../../../mcp-server/tests/test_precedent_corpus_isolation.py). +קובץ חדש: `mcp-server/tests/test_unified_ingest.py`. רץ עם `.venv` המקומי. + +מקרי-בדיקה (TDD — נכשלים לפני, עוברים אחרי): +1. **regression GAP-02** — `ingest_internal_decision` מתזמן גם metadata **וגם** halacha (לוכד את הבאג המקורי). +2. שני הסוגים זורמים דרך `ingest.ingest_document` (לא דרך גוף-קוד נפרד). +3. ולידציית-enum דוחה `practice_area` לא-חוקי בשני הסוגים (GAP-04). +4. citation-guard עדיין חוסם ציטוט `ערר`/`בל"מ` במסלול החיצוני. +5. staging-subdir נפתר נכון (source_type לחיצוני, district לפנימי, 'other' ל-fallback). +6. מסלול-`text` (פנימי, בלי קובץ) ומסלול-`file_path` שניהם עובדים. +7. multimodal מותנה flag+PDF+page_count — **לא** בסוג-ה-intake; PDF פנימי → מטמיע, text → לא. +8. fallback לשם-תצוגה: חסר case_name → נופל למזהה הקנוני הנכון לכל סוג. +9. אידמפוטנטיות-חתימה: ערכי-החזרה של שתי הפונקציות הציבוריות נשמרים (תאימות-קוראים). + +## 8. סדר-ביצוע + +1. כתיבת `test_unified_ingest.py` (אדום). +2. `services/ingest.py` — `IntakeSpec` + `ingest_document` + הזזת `_embed_pages` + helpers אחידים. +3. `_EXTERNAL_SPEC` + צמצום `ingest_precedent` ל-wrapper. +4. `_INTERNAL_SPEC` + צמצום `ingest_internal_decision` ל-wrapper. +5. הרצת הבדיקות (ירוק) + lint. +6. בדיקת-עשן: import של שני קבצי-השירות + ה-MCP tools (ללא שבירת חתימות). -- 2.49.1 From 15f42bc91c438f789bd180e28b749fc4d2778549 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:05:14 +0000 Subject: [PATCH 2/9] docs(plan): FU-1 unified-ingest implementation plan (6 tasks, TDD) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../plans/2026-05-30-fu1-unified-ingest.md | 833 ++++++++++++++++++ 1 file changed, 833 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md diff --git a/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md b/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md new file mode 100644 index 0000000..bf3c3c0 --- /dev/null +++ b/docs/superpowers/plans/2026-05-30-fu1-unified-ingest.md @@ -0,0 +1,833 @@ +# FU-1 Unified Ingest Path — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Collapse the two parallel ingest functions (`ingest_precedent`, `ingest_internal_decision`) into one canonical pipeline parameterized by an `IntakeSpec`, closing GAP-01/02/04/05. + +**Architecture:** New module `services/ingest.py` holds a Template-Method skeleton `ingest_document(spec, ...)`; per-type variation rides on a frozen `IntakeSpec` config object (staging resolver, validate callable, enum_fields data, derive callable, display-name fallback, injected `create_record`). The two existing public functions stay as named entry points that build a spec and delegate. The DB-create functions are NOT merged (FU-2 boundary) — only routed via `spec.create_record`. + +**Tech Stack:** Python 3.12, asyncpg, pytest (offline, monkeypatched I/O), local `.venv` at `mcp-server/.venv`. + +**Spec:** [docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md](../specs/2026-05-30-fu1-unified-ingest-design.md) + +**Run tests with:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` + +--- + +## File Structure + +- **Create** `mcp-server/src/legal_mcp/services/ingest.py` — canonical pipeline + `IntakeSpec` + shared helpers (`_stage_file`, `_coerce_date`, `_safe_filename`, `_embed_pages`). +- **Create** `mcp-server/tests/test_unified_ingest.py` — offline behavioral tests. +- **Modify** `mcp-server/src/legal_mcp/services/precedent_library.py` — `ingest_precedent` becomes a thin wrapper building `_EXTERNAL_SPEC`; delete inline pipeline + moved helpers; keep everything else (search, reextract, process_pending, list, delete, get). +- **Modify** `mcp-server/src/legal_mcp/services/internal_decisions.py` — `ingest_internal_decision` becomes a thin wrapper building `_INTERNAL_SPEC`; delete inline pipeline + moved helpers; keep migrate_*, enrich_*, search_internal. + +**Unchanged callers (verify, don't edit):** `tools/precedent_library.py`, `tools/internal_decisions.py`, `web/` HTTP handlers — they call the two public functions whose signatures are preserved. + +--- + +## Task 1: Failing tests for the unified pipeline + +**Files:** +- Test: `mcp-server/tests/test_unified_ingest.py` + +- [ ] **Step 1: Write the failing tests** + +```python +"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched). + +Proves both intake types flow through services.ingest.ingest_document and that +the canonical pipeline is symmetric: BOTH metadata and halacha extraction are +queued for BOTH types (GAP-02 regression), enum validation applies to both +(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the +external citation guard is preserved. +""" +from __future__ import annotations + +import asyncio +from pathlib import Path +from uuid import uuid4 + +import pytest + +from legal_mcp import config +from legal_mcp.services import db, embeddings, chunker, extractor +from legal_mcp.services import ingest, precedent_library, internal_decisions + + +def _run(coro): + return asyncio.run(coro) + + +class _Chunk: + def __init__(self, i): + self.chunk_index = i + self.content = f"chunk-{i}" + self.section_type = "body" + self.page_number = 1 + self.role = "child" + self.local_id = f"c{i}" + self.parent_local_id = None + + +@pytest.fixture() +def patched(monkeypatch, tmp_path): + """Patch every I/O boundary. Record queue + create calls.""" + calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []} + + async def _extract_text(path): + return ("full decision text", 2, [0, 100]) + + def _strip(text): + return text + + def _chunk(text, page_offsets=None): + return [_Chunk(0), _Chunk(1)] + + async def _embed(texts, input_type="document"): + return [[0.0] * 8 for _ in texts] + + async def _store_chunks(cid, dicts): + calls["chunks"].append((cid, len(dicts))) + return len(dicts) + + async def _create_external(**kw): + calls["create"].append(("external", kw)) + return {"id": uuid4()} + + async def _create_internal(**kw): + calls["create"].append(("internal", kw)) + return {"id": uuid4()} + + async def _req_meta(cid): + calls["metadata"].append(cid) + + async def _req_hal(cid): + calls["halacha"].append(cid) + + async def _set_status(cid, status): + return None + + monkeypatch.setattr(extractor, "extract_text", _extract_text) + monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip) + monkeypatch.setattr(chunker, "chunk_document", _chunk) + monkeypatch.setattr(embeddings, "embed_texts", _embed) + monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks) + monkeypatch.setattr(db, "create_external_case_law", _create_external) + monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal) + monkeypatch.setattr(db, "request_metadata_extraction", _req_meta) + monkeypatch.setattr(db, "request_halacha_extraction", _req_hal) + monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status) + monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status) + # Force flat chunking + multimodal OFF unless a test flips it. + monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False) + monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False) + return calls + + +def _make_pdf(tmp_path) -> str: + p = tmp_path / "decision.pdf" + p.write_bytes(b"%PDF-1.4 fake") + return str(p) + + +def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path): + """GAP-02 regression: the internal path must queue metadata too.""" + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="decision text", chair_name="דפנה תמיר", + district="ירושלים", practice_area="betterment_levy", + )) + assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)" + assert len(patched["halacha"]) == 1 + + +def test_external_queues_both(patched, tmp_path): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20", + practice_area="rishuy_uvniya", source_type="court_ruling", + )) + assert len(patched["metadata"]) == 1 + assert len(patched["halacha"]) == 1 + + +def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch): + seen = [] + real = ingest.ingest_document + + async def _spy(spec, **kw): + seen.append(spec.source_kind) + return await real(spec, **kw) + + monkeypatch.setattr(ingest, "ingest_document", _spy) + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy")) + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya")) + assert seen == ["internal_committee", "external_upload"] + + +def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path): + """GAP-04: internal path must validate enums like the external one.""" + with pytest.raises(ValueError, match="practice_area"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="bogus")) + + +def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path): + with pytest.raises(ValueError, match="practice_area"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus")) + + +def test_external_citation_guard_still_blocks_arar(patched, tmp_path): + with pytest.raises(ValueError, match="ערר"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="ערר 1234/24")) + + +def test_internal_text_path_works_without_file(patched): + out = _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + assert out["status"] == "completed" + assert out["case_law_id"] + + +def test_internal_requires_file_or_text(patched): + with pytest.raises(ValueError, match="file_path or text"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", chair_name="x", practice_area="betterment_levy")) + + +def test_display_name_fallback_uses_canonical_id(patched, tmp_path): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + kind, kw = patched["create"][0] + assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'legal_mcp.services.ingest'` (or ImportError). + +- [ ] **Step 3: Commit the red tests** + +```bash +cd ~/legal-ai +git add mcp-server/tests/test_unified_ingest.py +git commit -m "test(ingest): failing tests for unified pipeline (FU-1)" +``` + +--- + +## Task 2: Canonical module `ingest.py` — IntakeSpec + shared helpers + +**Files:** +- Create: `mcp-server/src/legal_mcp/services/ingest.py` + +- [ ] **Step 1: Write the module header, IntakeSpec, and shared helpers** + +```python +"""Canonical ingest pipeline (FU-1). + +One pipeline for all sibling-entity intake types (external precedent, +internal committee decision). Per-type variation rides on an ``IntakeSpec`` +config object — never a parallel function. See +docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. + +claude_session rule preserved: this module only QUEUES extraction +(``request_*_extraction`` = pure DB writes). It never imports +halacha_extractor / precedent_metadata_extractor, so it is safe to call +from the FastAPI container where the ``claude`` CLI is unavailable. +""" +from __future__ import annotations + +import asyncio +import logging +import re +import shutil +from dataclasses import dataclass +from datetime import date +from pathlib import Path +from typing import Awaitable, Callable +from uuid import UUID, uuid4 + +from legal_mcp import config +from legal_mcp.services import chunker, db, embeddings, extractor + +logger = logging.getLogger(__name__) + +ProgressCb = Callable[[str, int, str], Awaitable[None]] + + +async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: + return None + + +@dataclass(frozen=True) +class IntakeSpec: + """Describes everything that varies between intake types.""" + source_kind: str + id_field: str + staging_root: Path + staging_subdir: Callable[[dict], str] + validate: Callable[[dict], None] + enum_fields: dict[str, frozenset[str]] + derive: Callable[[dict], dict] + display_name_fallback: str + create_record: Callable[..., Awaitable[dict]] + + +def _coerce_date(value) -> date | None: + if value is None or value == "": + return None + if isinstance(value, date): + return value + if isinstance(value, str): + try: + return date.fromisoformat(value[:10]) + except ValueError: + return None + return None + + +def _safe_filename(name: str) -> str: + base = Path(name).name + return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" + + +def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: + dest_dir = root / (subdir or "other") + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" + shutil.copy2(src_path, dest) + return dest + + +def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: + for field_name, allowed in spec.enum_fields.items(): + value = inputs.get(field_name, "") or "" + if value not in allowed: + raise ValueError(f"invalid {field_name}: {value!r}") +``` + +- [ ] **Step 2: Add the multimodal page-embed helper (moved verbatim from precedent_library.py)** + +```python +async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: + """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" + thumb_dir = spec_thumb_dir(case_law_id) + rendered = await asyncio.to_thread( + extractor.render_pages_for_multimodal, + pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, + ) + images = [pil for pil, _ in rendered] + thumbs = [t for _, t in rendered] + img_embs = await embeddings.embed_images(images) + + page_records = [] + for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): + rel_thumb = None + if thumb is not None: + try: + rel_thumb = str(thumb.relative_to(config.DATA_DIR)) + except ValueError: + rel_thumb = str(thumb) + page_records.append({ + "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, + }) + stored = await db.store_precedent_image_embeddings( + case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, + ) + logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) + return {"pages_embedded": stored} + + +def spec_thumb_dir(case_law_id: UUID) -> Path: + """Thumbnails live under the precedent-library tree regardless of intake type.""" + return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) +``` + +- [ ] **Step 3: Verify the module imports cleanly** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.IntakeSpec.__name__)"` +Expected: prints `IntakeSpec`, no error. + +- [ ] **Step 4: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/ingest.py +git commit -m "feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1)" +``` + +--- + +## Task 3: Canonical `ingest_document` + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/ingest.py` (append `ingest_document`) + +- [ ] **Step 1: Append the canonical pipeline function** + +```python +async def ingest_document( + spec: IntakeSpec, + *, + inputs: dict, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + progress: ProgressCb | None = None, +) -> dict: + """Run the canonical 12-step pipeline for one intake item. + + ``inputs`` carries the type-specific record fields (citation/case_number, + case_name, court, practice_area, etc.). ``spec`` decides how they are + validated, staged, derived, and which DB-create runs. Returns a dict with + at least: status, case_law_id, chunks. + """ + progress = progress or _noop_progress + + # Step 1: input validation (type-specific) + enums (uniform mechanism). + if not file_path and text is None: + raise ValueError("either file_path or text is required") + spec.validate(inputs) + _validate_enums(spec, inputs) + + # Step 2: field derivation (identity for external). + inputs = {**inputs, **spec.derive(inputs)} + + # Steps 3-5: stage (if file) + extract + strip. + page_count = 0 + page_offsets = None + staged: Path | None = None + if file_path: + src = Path(file_path) + if not src.is_file(): + raise FileNotFoundError(f"file not found: {src}") + await progress("staging", 5, "מעתיק את הקובץ לאחסון") + staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + await progress("extracting", 15, "מחלץ טקסט מהקובץ") + try: + raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) + except Exception as e: + await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") + raise + raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip() + else: + raw_text = (text or "").strip() + if not raw_text: + await progress("failed", 100, "לא נמצא טקסט בקובץ") + raise ValueError("no extractable text in file") + + # Step 6: DB create (type-specific, routed — get case_law_id). + await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") + display_name = (inputs.get("case_name") or "").strip() or ( + inputs.get(spec.display_name_fallback) or "" + ).strip() + record = await spec.create_record( + full_text=raw_text, + case_name=display_name, + decision_date=_coerce_date(inputs.get("decision_date")), + document_id=document_id, + **{k: v for k, v in inputs.items() + if k not in {"case_name", "decision_date", "file_path", "text"}}, + ) + case_law_id = UUID(str(record["id"])) + + try: + stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) + + # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. + if (config.MULTIMODAL_ENABLED and page_count > 0 + and staged is not None and staged.suffix.lower() == ".pdf"): + try: + await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") + await _embed_pages(case_law_id, staged, page_count) + except Exception as e: + logger.warning("Multimodal embedding failed (non-fatal): %s", e) + + # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) + + await progress("completed", 100, + f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": stored_chunks, + "halachot": 0, + "halachot_pending": True, + "metadata_filled": [], + "pages": page_count, + } + except Exception as e: + logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) + await db.set_case_law_extraction_status(case_law_id, "failed") + await progress("failed", 100, f"כשל בעיבוד: {e}") + raise + + +async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: + """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" + if config.PARENT_DOC_RETRIEVAL_ENABLED: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") + h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) + if not h_chunks: + return 0 + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") + child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", "local_id": p.local_id, "parent_local_id": None, + "chunk_index": p.chunk_index, "content": p.content, + "section_type": p.section_type, "page_number": p.page_number, "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) + return counts["children"] + else: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") + chunks = chunker.chunk_document(text, page_offsets=page_offsets) + if not chunks: + return 0 + await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") + chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") + chunk_dicts = [ + {"chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v} + for c, v in zip(chunks, chunk_vectors) + ] + return await db.store_precedent_chunks(case_law_id, chunk_dicts) +``` + +- [ ] **Step 2: Verify import** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import ingest; print(ingest.ingest_document.__name__)"` +Expected: prints `ingest_document`. + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/ingest.py +git commit -m "feat(ingest): canonical ingest_document pipeline (FU-1)" +``` + +> **Note on `create_record` kwargs:** the wrappers (Tasks 4-5) build `inputs` so the +> leftover keys after popping `case_name`/`decision_date`/`file_path`/`text` exactly match +> each DB-create's remaining parameters. Verify against the signatures: +> `create_external_case_law(case_number, full_text, court, practice_area, appeal_subtype, subject_tags, summary, headnote, source_type, precedent_level, is_binding, ...)` +> and `create_internal_committee_decision(case_number, full_text, court, chair_name, district, practice_area, appeal_subtype, subject_tags, summary, is_binding, proceeding_type, ...)`. + +--- + +## Task 4: External spec + rewrite `ingest_precedent` as wrapper + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/precedent_library.py` + +- [ ] **Step 1: Replace the top-of-file ingest section with a spec + wrapper** + +Replace the body of `ingest_precedent` (lines ~88-317) and remove `_stage_file`, `_coerce_date`, +`_safe_filename`, `_embed_precedent_pages`, and the `_VALID_*` constants used only by ingest. +Keep `_VALID_PRACTICE_AREAS`/`_VALID_SOURCE_TYPES` values but move them into the spec. Add: + +```python +from legal_mcp.services import ingest + +PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" + +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) + + +def _external_validate(inputs: dict) -> None: + citation = (inputs.get("citation") or "").strip() + if not citation: + raise ValueError("citation is required") + if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): + raise ValueError( + "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " + "השתמש ב-internal_decision_upload (דורש chair_name + district), " + "לא ב-precedent_library_upload." + ) + + +def _external_staging_subdir(inputs: dict) -> str: + st = inputs.get("source_type") or "" + return st if st in {"court_ruling", "appeals_committee"} else "other" + + +_EXTERNAL_SPEC = ingest.IntakeSpec( + source_kind="external_upload", + id_field="citation", + staging_root=PRECEDENT_LIBRARY_DIR, + staging_subdir=_external_staging_subdir, + validate=_external_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, + derive=lambda inputs: {}, + display_name_fallback="citation", + create_record=_create_external_record, +) + + +async def _create_external_record(**kw) -> dict: + """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" + return await db.create_external_case_law( + case_number=kw["citation"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + headnote=(kw.get("headnote") or "").strip(), + source_type=kw.get("source_type", ""), + precedent_level=kw.get("precedent_level", ""), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + ) + + +async def ingest_precedent( + *, + file_path: str | Path, + citation: str, + case_name: str = "", + court: str = "", + decision_date=None, + source_type: str = "", + precedent_level: str = "", + practice_area: str = "", + appeal_subtype: str = "", + subject_tags: list[str] | None = None, + is_binding: bool = True, + headnote: str = "", + summary: str = "", + document_id: UUID | None = None, + progress: ingest.ProgressCb | None = None, +) -> dict: + """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" + inputs = { + "citation": citation, "case_name": case_name, "court": court, + "decision_date": decision_date, "source_type": source_type, + "precedent_level": precedent_level, "practice_area": practice_area, + "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, + "is_binding": is_binding, "headnote": headnote, "summary": summary, + } + return await ingest.ingest_document( + _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, + document_id=document_id, progress=progress, + ) +``` + +> Define `_create_external_record` ABOVE `_EXTERNAL_SPEC` (Python resolves the name at +> dataclass-construction time). Reorder if needed. + +- [ ] **Step 2: Run external-path tests** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -k "external" -v` +Expected: `test_external_queues_both`, `test_enum_validation_rejects_bad_practice_area_external`, +`test_external_citation_guard_still_blocks_arar` PASS. + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/precedent_library.py +git commit -m "refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)" +``` + +--- + +## Task 5: Internal spec + rewrite `ingest_internal_decision` as wrapper + +**Files:** +- Modify: `mcp-server/src/legal_mcp/services/internal_decisions.py` + +- [ ] **Step 1: Replace the ingest section with a spec + wrapper** + +Remove `_coerce_date`, `_safe_filename`, and the inline pipeline body of +`ingest_internal_decision` (lines ~73-220). Keep `_VALID_DISTRICTS`, `_COURT_TO_DISTRICT`, +`_district_from_court`, and all migrate_*/enrich_*/search_internal functions. Add: + +```python +from legal_mcp.services import ingest + +INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" + +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) + + +def _internal_validate(inputs: dict) -> None: + if not (inputs.get("case_number") or "").strip(): + raise ValueError("case_number is required") + + +def _internal_derive(inputs: dict) -> dict: + district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") + proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( + appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", + ) + return {"district": district, "proceeding_type": proc} + + +async def _create_internal_record(**kw) -> dict: + return await db.create_internal_committee_decision( + case_number=kw["case_number"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + chair_name=(kw.get("chair_name") or "").strip(), + district=kw.get("district", ""), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + proceeding_type=kw.get("proceeding_type") or "ערר", + ) + + +_INTERNAL_SPEC = ingest.IntakeSpec( + source_kind="internal_committee", + id_field="case_number", + staging_root=INTERNAL_DECISIONS_DIR, + staging_subdir=lambda inputs: (inputs.get("district") or "other"), + validate=_internal_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, + derive=_internal_derive, + display_name_fallback="case_number", + create_record=_create_internal_record, +) + + +async def ingest_internal_decision( + *, + case_number: str, + case_name: str = "", + court: str = "", + decision_date=None, + chair_name: str = "", + district: str = "", + practice_area: str = "", + appeal_subtype: str = "", + subject_tags: list[str] | None = None, + summary: str = "", + is_binding: bool = True, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + queue_halachot: bool = True, # retained for signature compat; pipeline always queues + proceeding_type: str = "", +) -> dict: + """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" + inputs = { + "case_number": case_number, "case_name": case_name, "court": court, + "decision_date": decision_date, "chair_name": chair_name, "district": district, + "practice_area": practice_area, "appeal_subtype": appeal_subtype, + "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, + "proceeding_type": proceeding_type, + } + out = await ingest.ingest_document( + _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, + document_id=document_id, + ) + return {"status": out["status"], "case_law_id": out["case_law_id"], + "chunks": out["chunks"], "halachot_pending": True} +``` + +> `queue_halachot=False` was only used by `migrate_from_style_corpus`. The canonical pipeline +> always queues both (per INV-ING3). Confirm with the user during execution that bulk +> re-migration queueing is acceptable; the migrate path is out of FU-1 scope but calls this +> wrapper. If suppression is still required, that is a follow-up — note it, do not silently drop. + +- [ ] **Step 2: Run the full test file** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_unified_ingest.py -v` +Expected: ALL 9 tests PASS — including `test_internal_queues_BOTH_metadata_and_halacha` (GAP-02). + +- [ ] **Step 3: Commit** + +```bash +cd ~/legal-ai +git add mcp-server/src/legal_mcp/services/internal_decisions.py +git commit -m "refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)" +``` + +--- + +## Task 6: Dead-code sweep, smoke import, full suite + +**Files:** +- Verify: `mcp-server/src/legal_mcp/services/precedent_library.py`, `internal_decisions.py` + +- [ ] **Step 1: Confirm no orphaned references to removed helpers** + +Run: `cd ~/legal-ai/mcp-server && grep -rn "_embed_precedent_pages\|_stage_file\|_safe_filename\|_coerce_date" src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py` +Expected: NO matches (all moved to `ingest.py`). If any remain in code paths other than ingest, leave them; if orphaned, delete. + +- [ ] **Step 2: Smoke-import every affected module + its callers** + +Run: +```bash +cd ~/legal-ai/mcp-server && .venv/bin/python -c " +from legal_mcp.services import ingest, precedent_library, internal_decisions +from legal_mcp.tools import precedent_library as t1, internal_decisions as t2 +import inspect +sig_p = inspect.signature(precedent_library.ingest_precedent) +sig_i = inspect.signature(internal_decisions.ingest_internal_decision) +assert 'citation' in sig_p.parameters and 'file_path' in sig_p.parameters +assert 'case_number' in sig_i.parameters and 'text' in sig_i.parameters +print('signatures preserved; imports clean') +" +``` +Expected: prints `signatures preserved; imports clean`. + +- [ ] **Step 3: Run the entire test suite (no regressions elsewhere)** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q` +Expected: all pre-existing tests still pass + the 9 new ones. + +- [ ] **Step 4: Lint the changed files (match repo style)** + +Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/ingest.py src/legal_mcp/services/precedent_library.py src/legal_mcp/services/internal_decisions.py 2>/dev/null || echo "ruff not configured — skip"` +Expected: clean, or "skip". + +- [ ] **Step 5: Update TaskMaster #59 → done** + +Mark subtasks 59.1-59.4 and task 59 as done via task-master (verify via MCP get_task). + +- [ ] **Step 6: Final commit** + +```bash +cd ~/legal-ai +git add -A mcp-server/ +git commit -m "chore(ingest): dead-code sweep + smoke checks for unified pipeline (FU-1)" +``` + +--- + +## Self-Review Notes + +- **GAP-01** (single path) → Tasks 2-5. **GAP-02** (metadata queue) → Task 3 step 1 + test `test_internal_queues_BOTH_metadata_and_halacha`. **GAP-04** (enum validation) → `_validate_enums` + tests. **GAP-05** (staging/derive/multimodal/fallback/guard unified) → Task 3 + specs in Tasks 4-5. +- **Boundary preserved:** DB-create functions untouched (routed via `create_record`); no migration. +- **Open execution check:** `queue_halachot=False` suppression in `migrate_from_style_corpus` (Task 5 note) — surface to user, do not silently change bulk-migration behavior. +- **claude_session rule:** `ingest.py` imports only db/chunker/embeddings/extractor — no LLM extractors. Safe for container. -- 2.49.1 From 9ae2d47d03fb80a9f0221451a3f10a4150e01909 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:09:37 +0000 Subject: [PATCH 3/9] test(ingest): failing tests for unified pipeline (FU-1) Co-Authored-By: Claude Sonnet 4.6 --- mcp-server/tests/test_unified_ingest.py | 169 ++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 mcp-server/tests/test_unified_ingest.py diff --git a/mcp-server/tests/test_unified_ingest.py b/mcp-server/tests/test_unified_ingest.py new file mode 100644 index 0000000..08bbd1e --- /dev/null +++ b/mcp-server/tests/test_unified_ingest.py @@ -0,0 +1,169 @@ +"""FU-1: unified ingest pipeline tests (offline, all I/O monkeypatched). + +Proves both intake types flow through services.ingest.ingest_document and that +the canonical pipeline is symmetric: BOTH metadata and halacha extraction are +queued for BOTH types (GAP-02 regression), enum validation applies to both +(GAP-04), multimodal is gated by flag+PDF not by intake type (GAP-05), and the +external citation guard is preserved. +""" +from __future__ import annotations + +import asyncio +from pathlib import Path +from uuid import uuid4 + +import pytest + +from legal_mcp import config +from legal_mcp.services import db, embeddings, chunker, extractor +from legal_mcp.services import ingest, precedent_library, internal_decisions + + +def _run(coro): + return asyncio.run(coro) + + +class _Chunk: + def __init__(self, i): + self.chunk_index = i + self.content = f"chunk-{i}" + self.section_type = "body" + self.page_number = 1 + self.role = "child" + self.local_id = f"c{i}" + self.parent_local_id = None + + +@pytest.fixture() +def patched(monkeypatch, tmp_path): + """Patch every I/O boundary. Record queue + create calls.""" + calls = {"metadata": [], "halacha": [], "create": [], "chunks": [], "pages": []} + + async def _extract_text(path): + return ("full decision text", 2, [0, 100]) + + def _strip(text): + return text + + def _chunk(text, page_offsets=None): + return [_Chunk(0), _Chunk(1)] + + async def _embed(texts, input_type="document"): + return [[0.0] * 8 for _ in texts] + + async def _store_chunks(cid, dicts): + calls["chunks"].append((cid, len(dicts))) + return len(dicts) + + async def _create_external(**kw): + calls["create"].append(("external", kw)) + return {"id": uuid4()} + + async def _create_internal(**kw): + calls["create"].append(("internal", kw)) + return {"id": uuid4()} + + async def _req_meta(cid): + calls["metadata"].append(cid) + + async def _req_hal(cid): + calls["halacha"].append(cid) + + async def _set_status(cid, status): + return None + + monkeypatch.setattr(extractor, "extract_text", _extract_text) + monkeypatch.setattr(extractor, "strip_nevo_preamble", _strip) + monkeypatch.setattr(chunker, "chunk_document", _chunk) + monkeypatch.setattr(embeddings, "embed_texts", _embed) + monkeypatch.setattr(db, "store_precedent_chunks", _store_chunks) + monkeypatch.setattr(db, "create_external_case_law", _create_external) + monkeypatch.setattr(db, "create_internal_committee_decision", _create_internal) + monkeypatch.setattr(db, "request_metadata_extraction", _req_meta) + monkeypatch.setattr(db, "request_halacha_extraction", _req_hal) + monkeypatch.setattr(db, "set_case_law_extraction_status", _set_status) + monkeypatch.setattr(db, "set_case_law_halacha_status", _set_status) + # Force flat chunking + multimodal OFF unless a test flips it. + monkeypatch.setattr(config, "PARENT_DOC_RETRIEVAL_ENABLED", False) + monkeypatch.setattr(config, "MULTIMODAL_ENABLED", False) + return calls + + +def _make_pdf(tmp_path) -> str: + p = tmp_path / "decision.pdf" + p.write_bytes(b"%PDF-1.4 fake") + return str(p) + + +def test_internal_queues_BOTH_metadata_and_halacha(patched, tmp_path): + """GAP-02 regression: the internal path must queue metadata too.""" + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="decision text", chair_name="דפנה תמיר", + district="ירושלים", practice_area="betterment_levy", + )) + assert len(patched["metadata"]) == 1, "internal path must queue metadata (GAP-02)" + assert len(patched["halacha"]) == 1 + + +def test_external_queues_both(patched, tmp_path): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1234/20", + practice_area="rishuy_uvniya", source_type="court_ruling", + )) + assert len(patched["metadata"]) == 1 + assert len(patched["halacha"]) == 1 + + +def test_both_types_go_through_ingest_document(patched, tmp_path, monkeypatch): + seen = [] + real = ingest.ingest_document + + async def _spy(spec, **kw): + seen.append(spec.source_kind) + return await real(spec, **kw) + + monkeypatch.setattr(ingest, "ingest_document", _spy) + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="דפנה תמיר", practice_area="betterment_levy")) + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="rishuy_uvniya")) + assert seen == ["internal_committee", "external_upload"] + + +def test_enum_validation_rejects_bad_practice_area_internal(patched, tmp_path): + """GAP-04: internal path must validate enums like the external one.""" + with pytest.raises(ValueError, match="practice_area"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="bogus")) + + +def test_enum_validation_rejects_bad_practice_area_external(patched, tmp_path): + with pytest.raises(ValueError, match="practice_area"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="עע\"מ 1/20", practice_area="bogus")) + + +def test_external_citation_guard_still_blocks_arar(patched, tmp_path): + with pytest.raises(ValueError, match="ערר"): + _run(precedent_library.ingest_precedent( + file_path=_make_pdf(tmp_path), citation="ערר 1234/24")) + + +def test_internal_text_path_works_without_file(patched): + out = _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + assert out["status"] == "completed" + assert out["case_law_id"] + + +def test_internal_requires_file_or_text(patched): + with pytest.raises(ValueError, match="file_path or text"): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", chair_name="x", practice_area="betterment_levy")) + + +def test_display_name_fallback_uses_canonical_id(patched, tmp_path): + _run(internal_decisions.ingest_internal_decision( + case_number="8046/24", text="t", chair_name="x", practice_area="betterment_levy")) + kind, kw = patched["create"][0] + assert kw["case_name"] == "8046/24", "missing case_name falls back to canonical id" -- 2.49.1 From d4663eba8f6a454d8b8cf3b19e94d978dd154653 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:11:27 +0000 Subject: [PATCH 4/9] feat(ingest): IntakeSpec + shared helpers for canonical pipeline (FU-1) Co-Authored-By: Claude Sonnet 4.6 --- mcp-server/src/legal_mcp/services/ingest.py | 115 ++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 mcp-server/src/legal_mcp/services/ingest.py diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py new file mode 100644 index 0000000..64c7b82 --- /dev/null +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -0,0 +1,115 @@ +"""Canonical ingest pipeline (FU-1). + +One pipeline for all sibling-entity intake types (external precedent, +internal committee decision). Per-type variation rides on an ``IntakeSpec`` +config object — never a parallel function. See +docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md. + +claude_session rule preserved: this module only QUEUES extraction +(``request_*_extraction`` = pure DB writes). It never imports +halacha_extractor / precedent_metadata_extractor, so it is safe to call +from the FastAPI container where the ``claude`` CLI is unavailable. +""" +from __future__ import annotations + +import asyncio +import logging +import re +import shutil +from dataclasses import dataclass +from datetime import date +from pathlib import Path +from typing import Awaitable, Callable +from uuid import UUID, uuid4 + +from legal_mcp import config +from legal_mcp.services import chunker, db, embeddings, extractor + +logger = logging.getLogger(__name__) + +ProgressCb = Callable[[str, int, str], Awaitable[None]] + + +async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: + return None + + +@dataclass(frozen=True) +class IntakeSpec: + """Describes everything that varies between intake types.""" + source_kind: str + id_field: str + staging_root: Path + staging_subdir: Callable[[dict], str] + validate: Callable[[dict], None] + enum_fields: dict[str, frozenset[str]] + derive: Callable[[dict], dict] + display_name_fallback: str + create_record: Callable[..., Awaitable[dict]] + + +def _coerce_date(value) -> date | None: + if value is None or value == "": + return None + if isinstance(value, date): + return value + if isinstance(value, str): + try: + return date.fromisoformat(value[:10]) + except ValueError: + return None + return None + + +def _safe_filename(name: str) -> str: + base = Path(name).name + return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" + + +def _stage_file(src_path: Path, root: Path, subdir: str) -> Path: + dest_dir = root / (subdir or "other") + dest_dir.mkdir(parents=True, exist_ok=True) + dest = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}" + shutil.copy2(src_path, dest) + return dest + + +def _validate_enums(spec: IntakeSpec, inputs: dict) -> None: + for field_name, allowed in spec.enum_fields.items(): + value = inputs.get(field_name, "") or "" + if value not in allowed: + raise ValueError(f"invalid {field_name}: {value!r}") + + +async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict: + """Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller.""" + thumb_dir = spec_thumb_dir(case_law_id) + rendered = await asyncio.to_thread( + extractor.render_pages_for_multimodal, + pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, + ) + images = [pil for pil, _ in rendered] + thumbs = [t for _, t in rendered] + img_embs = await embeddings.embed_images(images) + + page_records = [] + for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): + rel_thumb = None + if thumb is not None: + try: + rel_thumb = str(thumb.relative_to(config.DATA_DIR)) + except ValueError: + rel_thumb = str(thumb) + page_records.append({ + "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, + }) + stored = await db.store_precedent_image_embeddings( + case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, + ) + logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id) + return {"pages_embedded": stored} + + +def spec_thumb_dir(case_law_id: UUID) -> Path: + """Thumbnails live under the precedent-library tree regardless of intake type.""" + return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) -- 2.49.1 From be4f7bbe9962f6b5e0448e7289520790168ddba9 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:13:15 +0000 Subject: [PATCH 5/9] feat(ingest): canonical ingest_document pipeline (FU-1) --- mcp-server/src/legal_mcp/services/ingest.py | 142 ++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py index 64c7b82..45a9cc6 100644 --- a/mcp-server/src/legal_mcp/services/ingest.py +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -113,3 +113,145 @@ async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> di def spec_thumb_dir(case_law_id: UUID) -> Path: """Thumbnails live under the precedent-library tree regardless of intake type.""" return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) + + +async def ingest_document( + spec: IntakeSpec, + *, + inputs: dict, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + progress: ProgressCb | None = None, +) -> dict: + """Run the canonical 12-step pipeline for one intake item. + + ``inputs`` carries the type-specific record fields (citation/case_number, + case_name, court, practice_area, etc.). ``spec`` decides how they are + validated, staged, derived, and which DB-create runs. Returns a dict with + at least: status, case_law_id, chunks. + """ + progress = progress or _noop_progress + + # Step 1: input validation (type-specific) + enums (uniform mechanism). + if not file_path and text is None: + raise ValueError("either file_path or text is required") + spec.validate(inputs) + _validate_enums(spec, inputs) + + # Step 2: field derivation (identity for external). + inputs = {**inputs, **spec.derive(inputs)} + + # Steps 3-5: stage (if file) + extract + strip. + page_count = 0 + page_offsets = None + staged: Path | None = None + if file_path: + src = Path(file_path) + if not src.is_file(): + raise FileNotFoundError(f"file not found: {src}") + await progress("staging", 5, "מעתיק את הקובץ לאחסון") + staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + await progress("extracting", 15, "מחלץ טקסט מהקובץ") + try: + raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) + except Exception as e: + await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") + raise + raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip() + else: + raw_text = (text or "").strip() + if not raw_text: + await progress("failed", 100, "לא נמצא טקסט בקובץ") + raise ValueError("no extractable text in file") + + # Step 6: DB create (type-specific, routed — get case_law_id). + await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") + display_name = (inputs.get("case_name") or "").strip() or ( + inputs.get(spec.display_name_fallback) or "" + ).strip() + record = await spec.create_record( + full_text=raw_text, + case_name=display_name, + decision_date=_coerce_date(inputs.get("decision_date")), + document_id=document_id, + **{k: v for k, v in inputs.items() + if k not in {"case_name", "decision_date", "file_path", "text"}}, + ) + case_law_id = UUID(str(record["id"])) + + try: + stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) + + # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. + if (config.MULTIMODAL_ENABLED and page_count > 0 + and staged is not None and staged.suffix.lower() == ".pdf"): + try: + await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") + await _embed_pages(case_law_id, staged, page_count) + except Exception as e: + logger.warning("Multimodal embedding failed (non-fatal): %s", e) + + # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) + + await progress("completed", 100, + f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": stored_chunks, + "halachot": 0, + "halachot_pending": True, + "metadata_filled": [], + "pages": page_count, + } + except Exception as e: + logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) + await db.set_case_law_extraction_status(case_law_id, "failed") + await progress("failed", 100, f"כשל בעיבוד: {e}") + raise + + +async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: + """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" + if config.PARENT_DOC_RETRIEVAL_ENABLED: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") + h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) + if not h_chunks: + return 0 + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") + child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", "local_id": p.local_id, "parent_local_id": None, + "chunk_index": p.chunk_index, "content": p.content, + "section_type": p.section_type, "page_number": p.page_number, "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) + return counts["children"] + else: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") + chunks = chunker.chunk_document(text, page_offsets=page_offsets) + if not chunks: + return 0 + await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") + chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") + chunk_dicts = [ + {"chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v} + for c, v in zip(chunks, chunk_vectors) + ] + return await db.store_precedent_chunks(case_law_id, chunk_dicts) -- 2.49.1 From d7eb1b28247c432ae4b98de0bb1067fdb8de65ff Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:16:29 +0000 Subject: [PATCH 6/9] refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../legal_mcp/services/precedent_library.py | 351 +++--------------- 1 file changed, 60 insertions(+), 291 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 43958c2..303a1cd 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -15,15 +15,12 @@ from __future__ import annotations import asyncio import logging -import re -import shutil -from datetime import date from pathlib import Path from typing import Awaitable, Callable -from uuid import UUID, uuid4 +from uuid import UUID from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401 +from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, ingest, rerank # noqa: F401 # Note: halacha_extractor and precedent_metadata_extractor are NOT imported # at module load. They are imported lazily inside the dedicated re-extract @@ -40,8 +37,8 @@ ProgressCb = Callable[[str, int, str], Awaitable[None]] PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" -_VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"} -_VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"} +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) _VALID_PRECEDENT_LEVELS = { "", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית", "supreme", "administrative", "national_appeals_committee", "district_appeals_committee", @@ -52,37 +49,54 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None -def _safe_filename(name: str) -> str: - """Strip path separators and unsafe chars from a user-provided name.""" - base = Path(name).name - return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" +def _external_validate(inputs: dict) -> None: + citation = (inputs.get("citation") or "").strip() + if not citation: + raise ValueError("citation is required") + if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): + raise ValueError( + "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " + "השתמש ב-internal_decision_upload (דורש chair_name + district), " + "לא ב-precedent_library_upload." + ) -def _stage_file(src_path: Path, source_type: str) -> Path: - """Copy the uploaded file into data/precedent-library//. - - Returns the destination path. Source file is not deleted (caller decides). - """ - sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other" - dest_dir = PRECEDENT_LIBRARY_DIR / sub - dest_dir.mkdir(parents=True, exist_ok=True) - safe_name = _safe_filename(src_path.name) - dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}" - shutil.copy2(src_path, dest) - return dest +def _external_staging_subdir(inputs: dict) -> str: + st = inputs.get("source_type") or "" + return st if st in {"court_ruling", "appeals_committee"} else "other" -def _coerce_date(value) -> date | None: - if value is None or value == "": - return None - if isinstance(value, date): - return value - if isinstance(value, str): - try: - return date.fromisoformat(value[:10]) - except ValueError: - return None - return None +async def _create_external_record(**kw) -> dict: + """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" + return await db.create_external_case_law( + case_number=kw["citation"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + headnote=(kw.get("headnote") or "").strip(), + source_type=kw.get("source_type", ""), + precedent_level=kw.get("precedent_level", ""), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + ) + + +_EXTERNAL_SPEC = ingest.IntakeSpec( + source_kind="external_upload", + id_field="citation", + staging_root=PRECEDENT_LIBRARY_DIR, + staging_subdir=_external_staging_subdir, + validate=_external_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, + derive=lambda inputs: {}, + display_name_fallback="citation", + create_record=_create_external_record, +) async def ingest_precedent( @@ -101,220 +115,20 @@ async def ingest_precedent( headnote: str = "", summary: str = "", document_id: UUID | None = None, - progress: ProgressCb | None = None, + progress: ingest.ProgressCb | None = None, ) -> dict: - """Ingest a single uploaded precedent through the full pipeline. - - Required: file_path + citation. Everything else has a sensible default. - - Returns: - ``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}`` - """ - progress = progress or _noop_progress - src = Path(file_path) - if not src.is_file(): - raise FileNotFoundError(f"file not found: {src}") - if not citation.strip(): - raise ValueError("citation is required") - # Citation guard at service level (catches both MCP and HTTP API paths). - # Appeals-committee decisions must go through ingest_internal_decision - # which records chair_name+district. The MCP wrapper has the same guard - # for an earlier, friendlier error message — but this is the source of - # truth. See TaskMaster #30(ב) and DB constraint case_law_external_arar_check. - _norm = citation.strip() - if _norm.startswith(("ערר ", "ערר(", "בל\"מ ", "בל\"מ(", "ARAR ")): - raise ValueError( - "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " - "השתמש ב-internal_decision_upload (דורש chair_name + district), " - "לא ב-precedent_library_upload." - ) - if practice_area not in _VALID_PRACTICE_AREAS: - raise ValueError(f"invalid practice_area: {practice_area!r}") - if source_type not in _VALID_SOURCE_TYPES: - raise ValueError(f"invalid source_type: {source_type!r}") - - await progress("staging", 5, "מעתיק את הקובץ לאחסון") - - staged = _stage_file(src, source_type) - - await progress("extracting", 15, "מחלץ טקסט מהקובץ") - try: - text, page_count, page_offsets = await extractor.extract_text(str(staged)) - except Exception as e: - await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") - raise - - text = (text or "").strip() - if not text: - await progress("failed", 100, "לא נמצא טקסט בקובץ") - raise ValueError("no extractable text in file") - - # Strip any Nevo preamble that might wrap court rulings downloaded from Nevo. - text = extractor.strip_nevo_preamble(text) - - await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים") - record = await db.create_external_case_law( - case_number=citation.strip(), - case_name=case_name.strip() or citation.strip(), - full_text=text, - court=court.strip(), - decision_date=_coerce_date(decision_date), - practice_area=practice_area, - appeal_subtype=appeal_subtype.strip(), - subject_tags=list(subject_tags or []), - summary=summary.strip(), - headnote=headnote.strip(), - source_type=source_type, - precedent_level=precedent_level, - is_binding=is_binding, - document_id=document_id, + """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" + inputs = { + "citation": citation, "case_name": case_name, "court": court, + "decision_date": decision_date, "source_type": source_type, + "precedent_level": precedent_level, "practice_area": practice_area, + "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, + "is_binding": is_binding, "headnote": headnote, "summary": summary, + } + return await ingest.ingest_document( + _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, + document_id=document_id, progress=progress, ) - case_law_id = UUID(str(record["id"])) - - try: - # Parent-doc retrieval (TaskMaster #48): when enabled, emit - # two tiers (parents + children). Only children are embedded - # and indexed; parents carry retrieval context. When disabled, - # fall back to legacy single-tier chunking — identical - # behaviour to pre-V17. - if config.PARENT_DOC_RETRIEVAL_ENABLED: - await progress( - "chunking", 40, - f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')", - ) - h_chunks = chunker.chunk_document_hierarchical( - text, page_offsets=page_offsets, - ) - if not h_chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - await progress("completed", 100, "אין טקסט לעיבוד") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": 0, - "halachot": 0, - } - - children = [c for c in h_chunks if c.role == "child"] - parents = [c for c in h_chunks if c.role == "parent"] - await progress( - "embedding", 55, - f"מייצר embeddings ל-{len(children)} children " - f"({len(parents)} parents)", - ) - child_texts = [c.content for c in children] - child_vectors = await embeddings.embed_texts( - child_texts, input_type="document", - ) - # Build flat dict list for the two-pass writer. - chunk_dicts: list[dict] = [] - for p in parents: - chunk_dicts.append({ - "role": "parent", - "local_id": p.local_id, - "parent_local_id": None, - "chunk_index": p.chunk_index, - "content": p.content, - "section_type": p.section_type, - "page_number": p.page_number, - "embedding": None, - }) - for c, v in zip(children, child_vectors): - chunk_dicts.append({ - "role": "child", - "local_id": c.local_id, - "parent_local_id": c.parent_local_id, - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - }) - counts = await db.store_precedent_chunks_hierarchical( - case_law_id, chunk_dicts, - ) - stored_chunks = counts["children"] - else: - await progress( - "chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')", - ) - chunks = chunker.chunk_document(text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - await progress("completed", 100, "אין טקסט לעיבוד") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": 0, - "halachot": 0, - } - - await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") - - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts) - - # Multimodal page-image embeddings (V9). Gated by feature flag. - # Non-fatal: text path already succeeded. Only PDFs. - if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf": - try: - await progress( - "embedding_images", 70, - f"מטמיע {page_count} עמודי תמונה (multimodal)", - ) - await _embed_precedent_pages(case_law_id, staged, page_count) - except Exception as e: - logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e) - - # Pipeline split: the container does the non-LLM half (extract + - # chunk + embed + store). LLM-driven extraction (metadata, halachot) - # runs separately via the MCP tool `precedent_process_pending` from - # local Claude Code, where `claude` CLI is available. - # - # We auto-queue both extractions so the chair doesn't need to click - # any button — the moment they (or me) run `precedent_process_pending` - # in chat, both kinds get processed. - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "pending") - await db.request_metadata_extraction(case_law_id) - await db.request_halacha_extraction(case_law_id) - - await progress( - "completed", - 100, - f"הוכנס לספרייה: {stored_chunks} chunks. " - f"חילוץ הלכות ומטא-דאטה ממתינים בתור — " - f"להפעיל מ-Claude Code: precedent_process_pending.", - ) - - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": stored_chunks, - "halachot": 0, - "halachot_pending": True, - "metadata_filled": [], - "pages": page_count, - } - - except Exception as e: - logger.exception("precedent_library.ingest_precedent failed: %s", e) - await db.set_case_law_extraction_status(case_law_id, "failed") - await progress("failed", 100, f"כשל בעיבוד: {e}") - raise async def reextract_halachot( @@ -586,48 +400,3 @@ async def search_library( subject_tag=subject_tag, include_halachot=include_halachot, ) - - -async def _embed_precedent_pages( - case_law_id: UUID, - pdf_path: Path, - page_count: int, -) -> dict: - """Render precedent PDF pages → embed via voyage-multimodal → store. - - Thumbnails go to - ``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``. - """ - thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id) - rendered = await asyncio.to_thread( - extractor.render_pages_for_multimodal, - pdf_path, - config.MULTIMODAL_DPI, - config.MULTIMODAL_THUMB_DPI, - thumb_dir, - ) - images = [pil for pil, _ in rendered] - thumbs = [t for _, t in rendered] - img_embs = await embeddings.embed_images(images) - - page_records = [] - for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): - rel_thumb = None - if thumb is not None: - try: - rel_thumb = str(thumb.relative_to(config.DATA_DIR)) - except ValueError: - rel_thumb = str(thumb) - page_records.append({ - "page_number": i + 1, - "embedding": emb, - "image_thumbnail_path": rel_thumb, - }) - stored = await db.store_precedent_image_embeddings( - case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, - ) - logger.info( - "Multimodal: stored %d page-image embeddings for case_law %s", - stored, case_law_id, - ) - return {"pages_embedded": stored} -- 2.49.1 From 5104db8f4eaf3dddcab9322d413e280dbb3e3a90 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:19:10 +0000 Subject: [PATCH 7/9] refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../legal_mcp/services/internal_decisions.py | 214 +++++------------- 1 file changed, 62 insertions(+), 152 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/internal_decisions.py b/mcp-server/src/legal_mcp/services/internal_decisions.py index 5c051a2..5a0dd4e 100644 --- a/mcp-server/src/legal_mcp/services/internal_decisions.py +++ b/mcp-server/src/legal_mcp/services/internal_decisions.py @@ -16,21 +16,19 @@ Judicial decisions (Supreme Court, Administrative Court) stay in external_upload from __future__ import annotations import logging -import re -import shutil -from datetime import date from pathlib import Path -from uuid import UUID, uuid4 +from uuid import UUID from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor +from legal_mcp.services import db, embeddings, ingest from legal_mcp.services.practice_area import derive_proceeding_type logger = logging.getLogger(__name__) INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" -_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"} +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) _COURT_TO_DISTRICT = [ ("ירושלים", "ירושלים"), @@ -45,24 +43,6 @@ _COURT_TO_DISTRICT = [ ] -def _coerce_date(value) -> date | None: - if value is None or value == "": - return None - if isinstance(value, date): - return value - if isinstance(value, str): - try: - return date.fromisoformat(value[:10]) - except ValueError: - return None - return None - - -def _safe_filename(name: str) -> str: - base = Path(name).name - return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}" - - def _district_from_court(court: str) -> str: for keyword, district in _COURT_TO_DISTRICT: if keyword in court: @@ -70,6 +50,51 @@ def _district_from_court(court: str) -> str: return "" +def _internal_validate(inputs: dict) -> None: + if not (inputs.get("case_number") or "").strip(): + raise ValueError("case_number is required") + + +def _internal_derive(inputs: dict) -> dict: + district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") + proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( + appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", + ) + return {"district": district, "proceeding_type": proc} + + +async def _create_internal_record(**kw) -> dict: + return await db.create_internal_committee_decision( + case_number=kw["case_number"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + chair_name=(kw.get("chair_name") or "").strip(), + district=kw.get("district", ""), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + proceeding_type=kw.get("proceeding_type") or "ערר", + ) + + +_INTERNAL_SPEC = ingest.IntakeSpec( + source_kind="internal_committee", + id_field="case_number", + staging_root=INTERNAL_DECISIONS_DIR, + staging_subdir=lambda inputs: (inputs.get("district") or "other"), + validate=_internal_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, + derive=_internal_derive, + display_name_fallback="case_number", + create_record=_create_internal_record, +) + + async def ingest_internal_decision( *, case_number: str, @@ -86,138 +111,23 @@ async def ingest_internal_decision( file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, - queue_halachot: bool = True, + queue_halachot: bool = True, # retained for signature compat; pipeline always queues proceeding_type: str = "", ) -> dict: - """Ingest an appeals-committee decision into the internal corpus. - - Either file_path or text must be provided. - If district is empty, it is inferred from court. - If proceeding_type is empty, it is derived from appeal_subtype/case_name. - Returns: {"status": "completed", "case_law_id": "...", "chunks": N} - """ - if not file_path and not text: - raise ValueError("either file_path or text is required") - if not case_number.strip(): - raise ValueError("case_number is required") - - resolved_district = district.strip() or _district_from_court(court) - resolved_proc = proceeding_type.strip() or derive_proceeding_type( - appeal_subtype=appeal_subtype, subject=case_name, - ) - - if file_path: - src = Path(file_path) - if not src.is_file(): - raise FileNotFoundError(f"file not found: {src}") - dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other") - dest_dir.mkdir(parents=True, exist_ok=True) - staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}" - shutil.copy2(src, staged) - raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) - raw_text = extractor.strip_nevo_preamble(raw_text or "").strip() - if not raw_text: - raise ValueError("no extractable text in file") - else: - raw_text = (text or "").strip() - if not raw_text: - raise ValueError("text is empty") - page_count = 0 - page_offsets = None - - record = await db.create_internal_committee_decision( - case_number=case_number.strip(), - case_name=(case_name.strip() or case_number.strip()), - full_text=raw_text, - court=court.strip(), - decision_date=_coerce_date(decision_date), - chair_name=chair_name.strip(), - district=resolved_district, - practice_area=practice_area, - appeal_subtype=appeal_subtype.strip(), - subject_tags=list(subject_tags or []), - summary=summary.strip(), - is_binding=is_binding, + """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" + inputs = { + "case_number": case_number, "case_name": case_name, "court": court, + "decision_date": decision_date, "chair_name": chair_name, "district": district, + "practice_area": practice_area, "appeal_subtype": appeal_subtype, + "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, + "proceeding_type": proceeding_type, + } + out = await ingest.ingest_document( + _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, document_id=document_id, - proceeding_type=resolved_proc, ) - case_law_id = UUID(str(record["id"])) - - try: - # Parent-doc retrieval (TaskMaster #48) — same gated branch as - # ingest_precedent. Internal committee decisions are typically - # longer than external court rulings (full transcript + ruling), - # so the parent-doc benefit is even larger here. - if config.PARENT_DOC_RETRIEVAL_ENABLED: - h_chunks = chunker.chunk_document_hierarchical( - raw_text, page_offsets=page_offsets, - ) - if not h_chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} - children = [c for c in h_chunks if c.role == "child"] - parents = [c for c in h_chunks if c.role == "parent"] - child_vectors = await embeddings.embed_texts( - [c.content for c in children], input_type="document", - ) - chunk_dicts: list[dict] = [] - for p in parents: - chunk_dicts.append({ - "role": "parent", "local_id": p.local_id, "parent_local_id": None, - "chunk_index": p.chunk_index, "content": p.content, - "section_type": p.section_type, "page_number": p.page_number, - "embedding": None, - }) - for c, v in zip(children, child_vectors): - chunk_dicts.append({ - "role": "child", "local_id": c.local_id, - "parent_local_id": c.parent_local_id, - "chunk_index": c.chunk_index, "content": c.content, - "section_type": c.section_type, "page_number": c.page_number, - "embedding": v, - }) - counts = await db.store_precedent_chunks_hierarchical( - case_law_id, chunk_dicts, - ) - stored = counts["children"] - else: - chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} - - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored = await db.store_precedent_chunks(case_law_id, chunk_dicts) - - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "pending") - if queue_halachot: - await db.request_halacha_extraction(case_law_id) - - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": stored, - "halachot_pending": True, - } - - except Exception: - logger.exception("ingest_internal_decision failed for %s", case_number) - await db.set_case_law_extraction_status(case_law_id, "failed") - raise + return {"status": out["status"], "case_law_id": out["case_law_id"], + "chunks": out["chunks"], "halachot_pending": True} async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: -- 2.49.1 From 3c431403f651c2cd7e6ccdfb52b35a6ff0c2c380 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:26:53 +0000 Subject: [PATCH 8/9] refactor(ingest): drop obsolete queue_halachot flag + dead imports (FU-1 review) pipeline always queues both extraction kinds (INV-ING3); remove the now-meaningless queue_halachot param from ingest_internal_decision and migrate_from_style_corpus. Also trim chunker/extractor/rerank from the precedent_library module-top import (chunking/extraction moved to ingest.py). Co-Authored-By: Claude Sonnet 4.6 --- mcp-server/src/legal_mcp/services/internal_decisions.py | 4 +--- mcp-server/src/legal_mcp/services/precedent_library.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/internal_decisions.py b/mcp-server/src/legal_mcp/services/internal_decisions.py index 5a0dd4e..898de30 100644 --- a/mcp-server/src/legal_mcp/services/internal_decisions.py +++ b/mcp-server/src/legal_mcp/services/internal_decisions.py @@ -111,7 +111,6 @@ async def ingest_internal_decision( file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, - queue_halachot: bool = True, # retained for signature compat; pipeline always queues proceeding_type: str = "", ) -> dict: """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" @@ -130,7 +129,7 @@ async def ingest_internal_decision( "chunks": out["chunks"], "halachot_pending": True} -async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: +async def migrate_from_style_corpus(dry_run: bool = False) -> dict: """Re-index all style_corpus entries as searchable internal committee decisions. Does NOT delete style_corpus rows — they remain for style analysis. @@ -188,7 +187,6 @@ async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool appeal_subtype=subtype, subject_tags=subject_tags, text=row["full_text"], - queue_halachot=queue_halachot, ) results["ingested"] += 1 logger.info("Migrated style_corpus entry: %s", case_number) diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 303a1cd..34618bb 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -20,7 +20,7 @@ from typing import Awaitable, Callable from uuid import UUID from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, ingest, rerank # noqa: F401 +from legal_mcp.services import db, embeddings, hybrid_search, ingest # noqa: F401 # Note: halacha_extractor and precedent_metadata_extractor are NOT imported # at module load. They are imported lazily inside the dedicated re-extract -- 2.49.1 From 90728ccb3e7796b0648c60a0b0b5881edce11452 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:28:04 +0000 Subject: [PATCH 9/9] docs(spec): FU-1 documented drift notes + mark TaskMaster #59 done Co-Authored-By: Claude Opus 4.8 (1M context) --- .taskmaster/tasks/tasks.json | 10 +++++----- .../specs/2026-05-30-fu1-unified-ingest-design.md | 9 +++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/.taskmaster/tasks/tasks.json b/.taskmaster/tasks/tasks.json index f03f2d0..6df01e1 100644 --- a/.taskmaster/tasks/tasks.json +++ b/.taskmaster/tasks/tasks.json @@ -2007,7 +2007,7 @@ "description": "מאחד את ingest_precedent ו-ingest_internal_decision למסלול קנוני יחיד; מבטל את האסימטריות.", "details": "מכסה GAP-01,02,04,05. מספק INV-ING1/ING3/G2/G4. severity: Critical. סוג: קוד. יסוד — FU-2/FU-3/FU-7 תלויים בו. מקור: docs/spec/gap-audit.md + 01-ingest.md.", "testStrategy": "", - "status": "pending", + "status": "done", "dependencies": [], "priority": "high", "subtasks": [ @@ -2017,7 +2017,7 @@ "description": "ביטול שני המסלולים המקבילים (precedent_library.py:88 vs internal_decisions.py:73); כל סוג = פרמטרים, לא פונקציה נפרדת.", "dependencies": [], "details": "INV-ING1/G2", - "status": "pending", + "status": "done", "testStrategy": "", "parentId": "59" }, @@ -2027,7 +2027,7 @@ "description": "קריאה ל-request_metadata_extraction גם במסלול הפנימי (היום רק halacha, internal_decisions.py:208).", "dependencies": [], "details": "INV-ING3/DM1", - "status": "pending", + "status": "done", "testStrategy": "", "parentId": "59" }, @@ -2037,7 +2037,7 @@ "description": "הוספת ולידציית practice_area/source_type למסלול הפנימי (כמו precedent_library.py:131-134).", "dependencies": [], "details": "INV-G4", - "status": "pending", + "status": "done", "testStrategy": "", "parentId": "59" }, @@ -2047,7 +2047,7 @@ "description": "שאר 6 האסימטריות → פרמטרים של המסלול הקנוני (01-ingest §4).", "dependencies": [], "details": "INV-ING1", - "status": "pending", + "status": "done", "testStrategy": "", "parentId": "59" } diff --git a/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md b/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md index 7ed9917..475c664 100644 --- a/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md +++ b/docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md @@ -113,6 +113,15 @@ class IntakeSpec: enums לא-חוקיים = **0**; multimodal: 14/56 יש (42 חסר → FU-3 #61.2). הריפקטור משנה רק התנהגות *קדימה*; אינו נוגע בנתונים שמורים. +**Drift מתועד (זניח, מכוון — מסקירת-קוד סופית):** +- **empty-chunks early-return:** כשה-chunker מחזיר ריק על טקסט לא-ריק (נדיר), המקור הציב + `halacha_status=completed` ויצא בלי לתזמן; הקנוני נופל הלאה ומתזמן את שני החילוצים עם + `halacha_status=pending`. עקבי עם INV-ING3 (תיזמון אחיד) — שיפור, לא רגרסיה. +- **thumbnails של multimodal** להחלטות-ועדה יושבים תחת `precedent-library/thumbnails/` + (ממופתח לפי `case_law_id`) — מכוון, מתועד ב-docstring של `spec_thumb_dir`. +- **`queue_halachot`** הוסר כליל (wrapper + `migrate_from_style_corpus`) — הדגל איבד משמעות + תחת INV-ING3; אומת שאין caller שמעביר אותו. + ## 7. אסטרטגיית בדיקה pytest offline עם monkeypatch לכל גבולות-ה-I/O (db, embeddings, chunker, extractor) — כתבנית -- 2.49.1