"""Orchestrator for the External Precedent Library. Ingest pipeline (one upload): file → extract_text → proofread → INSERT case_law (source_kind='external_upload') → chunk → embed → store precedent_chunks → halacha_extractor.extract → embed halachot → store halachot → set extraction_status='completed' Progress is reported via a caller-supplied async callback so the web layer can pipe updates into the existing Redis ProgressStore / SSE plumbing without this module knowing about Redis. """ from __future__ import annotations import asyncio import logging import re import shutil from datetime import date from pathlib import Path from typing import Awaitable, Callable from uuid import UUID, uuid4 from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401 # Note: halacha_extractor and precedent_metadata_extractor are NOT imported # at module load. They are imported lazily inside the dedicated re-extract # entry points so that `ingest_precedent` (called from the FastAPI container, # where `claude` CLI is unavailable) cannot accidentally pull them in. See # the architectural rule in services/claude_session.py. logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" _VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"} _VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"} _VALID_PRECEDENT_LEVELS = { "", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית", "supreme", "administrative", "national_appeals_committee", "district_appeals_committee", } async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None def _safe_filename(name: str) -> str: """Strip path separators and unsafe chars from a user-provided name.""" base = Path(name).name return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}" def _stage_file(src_path: Path, source_type: str) -> Path: """Copy the uploaded file into data/precedent-library//. Returns the destination path. Source file is not deleted (caller decides). """ sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other" dest_dir = PRECEDENT_LIBRARY_DIR / sub dest_dir.mkdir(parents=True, exist_ok=True) safe_name = _safe_filename(src_path.name) dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}" shutil.copy2(src_path, dest) return dest def _coerce_date(value) -> date | None: if value is None or value == "": return None if isinstance(value, date): return value if isinstance(value, str): try: return date.fromisoformat(value[:10]) except ValueError: return None return None async def ingest_precedent( *, file_path: str | Path, citation: str, case_name: str = "", court: str = "", decision_date=None, source_type: str = "", precedent_level: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, is_binding: bool = True, headnote: str = "", summary: str = "", document_id: UUID | None = None, progress: ProgressCb | None = None, ) -> dict: """Ingest a single uploaded precedent through the full pipeline. Required: file_path + citation. Everything else has a sensible default. Returns: ``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}`` """ progress = progress or _noop_progress src = Path(file_path) if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") if not citation.strip(): raise ValueError("citation is required") if practice_area not in _VALID_PRACTICE_AREAS: raise ValueError(f"invalid practice_area: {practice_area!r}") if source_type not in _VALID_SOURCE_TYPES: raise ValueError(f"invalid source_type: {source_type!r}") await progress("staging", 5, "מעתיק את הקובץ לאחסון") staged = _stage_file(src, source_type) await progress("extracting", 15, "מחלץ טקסט מהקובץ") try: text, page_count, page_offsets = await extractor.extract_text(str(staged)) except Exception as e: await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") raise text = (text or "").strip() if not text: await progress("failed", 100, "לא נמצא טקסט בקובץ") raise ValueError("no extractable text in file") # Strip any Nevo preamble that might wrap court rulings downloaded from Nevo. text = extractor.strip_nevo_preamble(text) await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים") record = await db.create_external_case_law( case_number=citation.strip(), case_name=case_name.strip() or citation.strip(), full_text=text, court=court.strip(), decision_date=_coerce_date(decision_date), practice_area=practice_area, appeal_subtype=appeal_subtype.strip(), subject_tags=list(subject_tags or []), summary=summary.strip(), headnote=headnote.strip(), source_type=source_type, precedent_level=precedent_level, is_binding=is_binding, document_id=document_id, ) case_law_id = UUID(str(record["id"])) try: await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") chunks = chunker.chunk_document(text, page_offsets=page_offsets) if not chunks: await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "completed") await progress("completed", 100, "אין טקסט לעיבוד") return { "status": "completed", "case_law_id": str(case_law_id), "chunks": 0, "halachot": 0, } await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") chunk_texts = [c.content for c in chunks] chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") chunk_dicts = [ { "chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v, } for c, v in zip(chunks, chunk_vectors) ] stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts) # Multimodal page-image embeddings (V9). Gated by feature flag. # Non-fatal: text path already succeeded. Only PDFs. if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf": try: await progress( "embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)", ) await _embed_precedent_pages(case_law_id, staged, page_count) except Exception as e: logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e) # Pipeline split: the container does the non-LLM half (extract + # chunk + embed + store). LLM-driven extraction (metadata, halachot) # runs separately via the MCP tool `precedent_process_pending` from # local Claude Code, where `claude` CLI is available. # # We auto-queue both extractions so the chair doesn't need to click # any button — the moment they (or me) run `precedent_process_pending` # in chat, both kinds get processed. await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "pending") await db.request_metadata_extraction(case_law_id) await db.request_halacha_extraction(case_law_id) await progress( "completed", 100, f"הוכנס לספרייה: {stored_chunks} chunks. " f"חילוץ הלכות ומטא-דאטה ממתינים בתור — " f"להפעיל מ-Claude Code: precedent_process_pending.", ) return { "status": "completed", "case_law_id": str(case_law_id), "chunks": stored_chunks, "halachot": 0, "halachot_pending": True, "metadata_filled": [], "pages": page_count, } except Exception as e: logger.exception("precedent_library.ingest_precedent failed: %s", e) await db.set_case_law_extraction_status(case_law_id, "failed") await progress("failed", 100, f"כשל בעיבוד: {e}") raise async def reextract_halachot( case_law_id: UUID | str, progress: ProgressCb | None = None, ) -> dict: """Re-run the halacha extractor on an existing precedent. Idempotent. **MCP-tool-only path.** This function calls into ``halacha_extractor``, which calls ``claude_session`` — the local CLI is required. Invoking this from the FastAPI container will raise ``Claude CLI not found``. See the architectural rule in ``services/claude_session.py``. """ from legal_mcp.services import halacha_extractor progress = progress or _noop_progress if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: raise ValueError("precedent not found") # Was restricted to source_kind='external_upload'; opened 2026-05-06 so # internal_committee rows can also be re-extracted when ingest produced # bad data. See note in db.request_metadata_extraction. await progress("extracting_halachot", 50, "מחלץ הלכות מחדש") result = await halacha_extractor.extract(case_law_id) await progress( "completed", 100, f"הופקו {result.get('stored', 0)} הלכות (ממתינות לאישור)", ) return result # Wait this many seconds between precedents in a multi-precedent run. # Anthropic rate-limits across the org, so back-to-back extractions of large # rulings (e.g. 129 chunks for one, then 79 for another) can spill the second # precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9 # halachot, 317/10 immediately after returned silent no_halachot. INTER_PRECEDENT_COOLDOWN_SEC = 30 # How many times to retry a precedent that came back as 'extraction_failed' # (i.e. >50% chunks crashed). Each retry uses a longer cooldown. PRECEDENT_RETRY_ATTEMPTS = 1 PRECEDENT_RETRY_COOLDOWN_SEC = 60 async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -> dict: """Drain the extraction queue (UI-button-stamped requests). The button in the web UI cannot run claude_session itself (it lives in the container, no CLI). It just stamps ``metadata_extraction_requested_at`` on the row. This function — called from local Claude Code via the MCP tool — picks each stamped row up, runs the extractor, and clears the timestamp. Sequencing: precedents are processed serially (never in parallel) and each is followed by a short cooldown so the Anthropic rate-limit counter has time to drain before the next big precedent starts. If halacha extraction comes back as ``extraction_failed`` we retry the same precedent once with a longer cooldown — matching the empirical pattern where the second precedent in a back-to-back run gets rate-limited but recovers after a brief pause. Args: kind: 'metadata' or 'halacha'. limit: max rows to process this run. """ from legal_mcp.services import halacha_extractor, precedent_metadata_extractor if kind not in {"metadata", "halacha"}: raise ValueError("kind must be 'metadata' or 'halacha'") pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) if not pending: return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} async def _run_once(cid: UUID) -> dict: if kind == "metadata": return await precedent_metadata_extractor.extract_and_apply(cid) return await halacha_extractor.extract(cid) results: list[dict] = [] processed = 0 for idx, row in enumerate(pending): if idx > 0: await asyncio.sleep(INTER_PRECEDENT_COOLDOWN_SEC) cid = UUID(str(row["id"])) attempts = 0 result: dict = {} try: result = await _run_once(cid) # Retry only on systematic extraction failure (rate-limit storm). # Don't retry on 'no_halachot' — that means Claude looked and # genuinely found nothing. while ( result.get("status") == "extraction_failed" and attempts < PRECEDENT_RETRY_ATTEMPTS ): attempts += 1 logger.warning( "process_pending_extractions: %s returned extraction_failed " "(%d/%d chunks crashed), retry %d/%d after %ds cooldown", cid, result.get("failed_chunks", 0), result.get("total_chunks", 0), attempts, PRECEDENT_RETRY_ATTEMPTS, PRECEDENT_RETRY_COOLDOWN_SEC, ) await asyncio.sleep(PRECEDENT_RETRY_COOLDOWN_SEC) result = await _run_once(cid) # Finalise: success or terminal failure both clear the request # so the queue moves on. (Use 'failed' DB state for terminal # extraction_failed so the UI shows the warning chip.) if kind == "halacha" and result.get("status") == "extraction_failed": await db.set_case_law_halacha_status(cid, "failed") await db.clear_extraction_request(cid, kind=kind) processed += 1 results.append({ "case_law_id": str(cid), "case_number": row.get("case_number", ""), "status": result.get("status", "unknown"), "fields": result.get("fields", []), "stored": result.get("stored", 0), "retry_attempts": attempts, }) except Exception as e: logger.exception("process_pending_extractions failed for %s: %s", cid, e) results.append({ "case_law_id": str(cid), "case_number": row.get("case_number", ""), "status": "failed", "error": str(e), "retry_attempts": attempts, }) # Don't clear the request — it stays for the next run. return { "status": "completed", "kind": kind, "processed": processed, "total_pending": len(pending), "results": results, } async def reextract_metadata( case_law_id: UUID | str, progress: ProgressCb | None = None, ) -> dict: """Re-run metadata extraction on an existing precedent. Only fills empty fields (subject_tags, summary, headnote, key_quote, appeal_subtype, and case_name when it equals the citation). User values are preserved. **MCP-tool-only path** — same constraint as :func:`reextract_halachot`. """ from legal_mcp.services import precedent_metadata_extractor progress = progress or _noop_progress if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: raise ValueError("precedent not found") # See note in db.request_metadata_extraction — opened to all source kinds. await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (תקציר, תגיות)") result = await precedent_metadata_extractor.extract_and_apply(case_law_id) fields = result.get("fields") or [] msg = ( f"מולאו {len(fields)} שדות: {', '.join(fields)}" if fields else "לא נמצא מה למלא (כל השדות מאוכלסים או לא ניתן לחלץ)" ) await progress("completed", 100, msg) return result async def delete_precedent(case_law_id: UUID | str) -> bool: """Delete a precedent and cascade chunks + halachot.""" if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) return await db.delete_case_law(case_law_id) async def get_precedent(case_law_id: UUID | str) -> dict | None: """Get a precedent with its halachot attached.""" if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: return None record["halachot"] = await db.list_halachot(case_law_id=case_law_id, limit=500) return record async def list_precedents( practice_area: str = "", court: str = "", precedent_level: str = "", source_type: str = "", search: str = "", limit: int = 100, offset: int = 0, ) -> list[dict]: return await db.list_external_case_law( practice_area=practice_area, court=court, precedent_level=precedent_level, source_type=source_type, search=search, limit=limit, offset=offset, ) async def search_library( query: str, practice_area: str = "", court: str = "", precedent_level: str = "", appeal_subtype: str = "", is_binding: bool | None = None, subject_tag: str = "", limit: int = 10, include_halachot: bool = True, ) -> list[dict]: """Semantic search merging halachot (rule-level) and chunks (passage-level). Only ``approved`` / ``published`` halachot are returned, per chair-review policy. Chunks are returned regardless of halacha review status. When ``VOYAGE_RERANK_ENABLED`` is set, results are passed through voyage rerank-2 (cross-encoder). The +0.05 halacha boost from ``search_precedent_library_semantic`` is preserved before rerank but the rerank scores ultimately decide the order. """ if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await hybrid_search.search_precedent_library_hybrid( query=query, query_text_embedding=query_vec, limit=limit, practice_area=practice_area, court=court, precedent_level=precedent_level, appeal_subtype=appeal_subtype, is_binding=is_binding, subject_tag=subject_tag, include_halachot=include_halachot, ) async def _embed_precedent_pages( case_law_id: UUID, pdf_path: Path, page_count: int, ) -> dict: """Render precedent PDF pages → embed via voyage-multimodal → store. Thumbnails go to ``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``. """ thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id) rendered = await asyncio.to_thread( extractor.render_pages_for_multimodal, pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir, ) images = [pil for pil, _ in rendered] thumbs = [t for _, t in rendered] img_embs = await embeddings.embed_images(images) page_records = [] for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): rel_thumb = None if thumb is not None: try: rel_thumb = str(thumb.relative_to(config.DATA_DIR)) except ValueError: rel_thumb = str(thumb) page_records.append({ "page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb, }) stored = await db.store_precedent_image_embeddings( case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, ) logger.info( "Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id, ) return {"pages_embedded": stored}