From d7eb1b28247c432ae4b98de0bb1067fdb8de65ff Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:16:29 +0000 Subject: [PATCH] refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../legal_mcp/services/precedent_library.py | 351 +++--------------- 1 file changed, 60 insertions(+), 291 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 43958c2..303a1cd 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -15,15 +15,12 @@ from __future__ import annotations import asyncio import logging -import re -import shutil -from datetime import date from pathlib import Path from typing import Awaitable, Callable -from uuid import UUID, uuid4 +from uuid import UUID from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401 +from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, ingest, rerank # noqa: F401 # Note: halacha_extractor and precedent_metadata_extractor are NOT imported # at module load. They are imported lazily inside the dedicated re-extract @@ -40,8 +37,8 @@ ProgressCb = Callable[[str, int, str], Awaitable[None]] PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" -_VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"} -_VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"} +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) _VALID_PRECEDENT_LEVELS = { "", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית", "supreme", "administrative", "national_appeals_committee", "district_appeals_committee", @@ -52,37 +49,54 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None -def _safe_filename(name: str) -> str: - """Strip path separators and unsafe chars from a user-provided name.""" - base = Path(name).name - return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" +def _external_validate(inputs: dict) -> None: + citation = (inputs.get("citation") or "").strip() + if not citation: + raise ValueError("citation is required") + if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): + raise ValueError( + "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " + "השתמש ב-internal_decision_upload (דורש chair_name + district), " + "לא ב-precedent_library_upload." + ) -def _stage_file(src_path: Path, source_type: str) -> Path: - """Copy the uploaded file into data/precedent-library//. - - Returns the destination path. Source file is not deleted (caller decides). - """ - sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other" - dest_dir = PRECEDENT_LIBRARY_DIR / sub - dest_dir.mkdir(parents=True, exist_ok=True) - safe_name = _safe_filename(src_path.name) - dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}" - shutil.copy2(src_path, dest) - return dest +def _external_staging_subdir(inputs: dict) -> str: + st = inputs.get("source_type") or "" + return st if st in {"court_ruling", "appeals_committee"} else "other" -def _coerce_date(value) -> date | None: - if value is None or value == "": - return None - if isinstance(value, date): - return value - if isinstance(value, str): - try: - return date.fromisoformat(value[:10]) - except ValueError: - return None - return None +async def _create_external_record(**kw) -> dict: + """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" + return await db.create_external_case_law( + case_number=kw["citation"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + headnote=(kw.get("headnote") or "").strip(), + source_type=kw.get("source_type", ""), + precedent_level=kw.get("precedent_level", ""), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + ) + + +_EXTERNAL_SPEC = ingest.IntakeSpec( + source_kind="external_upload", + id_field="citation", + staging_root=PRECEDENT_LIBRARY_DIR, + staging_subdir=_external_staging_subdir, + validate=_external_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, + derive=lambda inputs: {}, + display_name_fallback="citation", + create_record=_create_external_record, +) async def ingest_precedent( @@ -101,220 +115,20 @@ async def ingest_precedent( headnote: str = "", summary: str = "", document_id: UUID | None = None, - progress: ProgressCb | None = None, + progress: ingest.ProgressCb | None = None, ) -> dict: - """Ingest a single uploaded precedent through the full pipeline. - - Required: file_path + citation. Everything else has a sensible default. - - Returns: - ``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}`` - """ - progress = progress or _noop_progress - src = Path(file_path) - if not src.is_file(): - raise FileNotFoundError(f"file not found: {src}") - if not citation.strip(): - raise ValueError("citation is required") - # Citation guard at service level (catches both MCP and HTTP API paths). - # Appeals-committee decisions must go through ingest_internal_decision - # which records chair_name+district. The MCP wrapper has the same guard - # for an earlier, friendlier error message — but this is the source of - # truth. See TaskMaster #30(ב) and DB constraint case_law_external_arar_check. - _norm = citation.strip() - if _norm.startswith(("ערר ", "ערר(", "בל\"מ ", "בל\"מ(", "ARAR ")): - raise ValueError( - "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " - "השתמש ב-internal_decision_upload (דורש chair_name + district), " - "לא ב-precedent_library_upload." - ) - if practice_area not in _VALID_PRACTICE_AREAS: - raise ValueError(f"invalid practice_area: {practice_area!r}") - if source_type not in _VALID_SOURCE_TYPES: - raise ValueError(f"invalid source_type: {source_type!r}") - - await progress("staging", 5, "מעתיק את הקובץ לאחסון") - - staged = _stage_file(src, source_type) - - await progress("extracting", 15, "מחלץ טקסט מהקובץ") - try: - text, page_count, page_offsets = await extractor.extract_text(str(staged)) - except Exception as e: - await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") - raise - - text = (text or "").strip() - if not text: - await progress("failed", 100, "לא נמצא טקסט בקובץ") - raise ValueError("no extractable text in file") - - # Strip any Nevo preamble that might wrap court rulings downloaded from Nevo. - text = extractor.strip_nevo_preamble(text) - - await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים") - record = await db.create_external_case_law( - case_number=citation.strip(), - case_name=case_name.strip() or citation.strip(), - full_text=text, - court=court.strip(), - decision_date=_coerce_date(decision_date), - practice_area=practice_area, - appeal_subtype=appeal_subtype.strip(), - subject_tags=list(subject_tags or []), - summary=summary.strip(), - headnote=headnote.strip(), - source_type=source_type, - precedent_level=precedent_level, - is_binding=is_binding, - document_id=document_id, + """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" + inputs = { + "citation": citation, "case_name": case_name, "court": court, + "decision_date": decision_date, "source_type": source_type, + "precedent_level": precedent_level, "practice_area": practice_area, + "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, + "is_binding": is_binding, "headnote": headnote, "summary": summary, + } + return await ingest.ingest_document( + _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, + document_id=document_id, progress=progress, ) - case_law_id = UUID(str(record["id"])) - - try: - # Parent-doc retrieval (TaskMaster #48): when enabled, emit - # two tiers (parents + children). Only children are embedded - # and indexed; parents carry retrieval context. When disabled, - # fall back to legacy single-tier chunking — identical - # behaviour to pre-V17. - if config.PARENT_DOC_RETRIEVAL_ENABLED: - await progress( - "chunking", 40, - f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')", - ) - h_chunks = chunker.chunk_document_hierarchical( - text, page_offsets=page_offsets, - ) - if not h_chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - await progress("completed", 100, "אין טקסט לעיבוד") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": 0, - "halachot": 0, - } - - children = [c for c in h_chunks if c.role == "child"] - parents = [c for c in h_chunks if c.role == "parent"] - await progress( - "embedding", 55, - f"מייצר embeddings ל-{len(children)} children " - f"({len(parents)} parents)", - ) - child_texts = [c.content for c in children] - child_vectors = await embeddings.embed_texts( - child_texts, input_type="document", - ) - # Build flat dict list for the two-pass writer. - chunk_dicts: list[dict] = [] - for p in parents: - chunk_dicts.append({ - "role": "parent", - "local_id": p.local_id, - "parent_local_id": None, - "chunk_index": p.chunk_index, - "content": p.content, - "section_type": p.section_type, - "page_number": p.page_number, - "embedding": None, - }) - for c, v in zip(children, child_vectors): - chunk_dicts.append({ - "role": "child", - "local_id": c.local_id, - "parent_local_id": c.parent_local_id, - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - }) - counts = await db.store_precedent_chunks_hierarchical( - case_law_id, chunk_dicts, - ) - stored_chunks = counts["children"] - else: - await progress( - "chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')", - ) - chunks = chunker.chunk_document(text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - await progress("completed", 100, "אין טקסט לעיבוד") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": 0, - "halachot": 0, - } - - await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") - - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts) - - # Multimodal page-image embeddings (V9). Gated by feature flag. - # Non-fatal: text path already succeeded. Only PDFs. - if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf": - try: - await progress( - "embedding_images", 70, - f"מטמיע {page_count} עמודי תמונה (multimodal)", - ) - await _embed_precedent_pages(case_law_id, staged, page_count) - except Exception as e: - logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e) - - # Pipeline split: the container does the non-LLM half (extract + - # chunk + embed + store). LLM-driven extraction (metadata, halachot) - # runs separately via the MCP tool `precedent_process_pending` from - # local Claude Code, where `claude` CLI is available. - # - # We auto-queue both extractions so the chair doesn't need to click - # any button — the moment they (or me) run `precedent_process_pending` - # in chat, both kinds get processed. - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "pending") - await db.request_metadata_extraction(case_law_id) - await db.request_halacha_extraction(case_law_id) - - await progress( - "completed", - 100, - f"הוכנס לספרייה: {stored_chunks} chunks. " - f"חילוץ הלכות ומטא-דאטה ממתינים בתור — " - f"להפעיל מ-Claude Code: precedent_process_pending.", - ) - - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": stored_chunks, - "halachot": 0, - "halachot_pending": True, - "metadata_filled": [], - "pages": page_count, - } - - except Exception as e: - logger.exception("precedent_library.ingest_precedent failed: %s", e) - await db.set_case_law_extraction_status(case_law_id, "failed") - await progress("failed", 100, f"כשל בעיבוד: {e}") - raise async def reextract_halachot( @@ -586,48 +400,3 @@ async def search_library( subject_tag=subject_tag, include_halachot=include_halachot, ) - - -async def _embed_precedent_pages( - case_law_id: UUID, - pdf_path: Path, - page_count: int, -) -> dict: - """Render precedent PDF pages → embed via voyage-multimodal → store. - - Thumbnails go to - ``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``. - """ - thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id) - rendered = await asyncio.to_thread( - extractor.render_pages_for_multimodal, - pdf_path, - config.MULTIMODAL_DPI, - config.MULTIMODAL_THUMB_DPI, - thumb_dir, - ) - images = [pil for pil, _ in rendered] - thumbs = [t for _, t in rendered] - img_embs = await embeddings.embed_images(images) - - page_records = [] - for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): - rel_thumb = None - if thumb is not None: - try: - rel_thumb = str(thumb.relative_to(config.DATA_DIR)) - except ValueError: - rel_thumb = str(thumb) - page_records.append({ - "page_number": i + 1, - "embedding": emb, - "image_thumbnail_path": rel_thumb, - }) - stored = await db.store_precedent_image_embeddings( - case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, - ) - logger.info( - "Multimodal: stored %d page-image embeddings for case_law %s", - stored, case_law_id, - ) - return {"pages_embedded": stored}