"""Orchestrator for the External Precedent Library. Ingest pipeline (one upload): file → extract_text → proofread → INSERT case_law (source_kind='external_upload') → chunk → embed → store precedent_chunks → halacha_extractor.extract → embed halachot → store halachot → set extraction_status='completed' Progress is reported via a caller-supplied async callback so the web layer can pipe updates into the existing Redis ProgressStore / SSE plumbing without this module knowing about Redis. """ from __future__ import annotations import asyncio import logging import os from pathlib import Path from typing import Awaitable, Callable from uuid import UUID from legal_mcp import config from legal_mcp.services import db, embeddings, hybrid_search, ingest # noqa: F401 # Note: halacha_extractor and precedent_metadata_extractor are NOT imported # at module load. They are imported lazily inside the dedicated re-extract # entry points so that `ingest_precedent` (called from the FastAPI container, # where `claude` CLI is unavailable) cannot accidentally pull them in. See # the architectural rule in services/claude_session.py. logger = logging.getLogger(__name__) ProgressCb = Callable[[str, int, str], Awaitable[None]] PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library" _VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) _VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"}) _VALID_PRECEDENT_LEVELS = { "", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית", "supreme", "administrative", "national_appeals_committee", "district_appeals_committee", } async def _noop_progress(_status: str, _percent: int, _msg: str) -> None: return None def _external_validate(inputs: dict) -> None: citation = (inputs.get("citation") or "").strip() if not citation: raise ValueError("citation is required") if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")): raise ValueError( "ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. " "השתמש ב-internal_decision_upload (דורש chair_name + district), " "לא ב-precedent_library_upload." ) def _external_staging_subdir(inputs: dict) -> str: st = inputs.get("source_type") or "" return st if st in {"court_ruling", "appeals_committee"} else "other" async def _create_external_record(**kw) -> dict: """Adapter: maps canonical inputs (citation) to create_external_case_law(case_number).""" return await db.create_external_case_law( case_number=kw["citation"].strip(), case_name=kw["case_name"], full_text=kw["full_text"], court=(kw.get("court") or "").strip(), decision_date=kw.get("decision_date"), practice_area=kw.get("practice_area", ""), appeal_subtype=(kw.get("appeal_subtype") or "").strip(), subject_tags=list(kw.get("subject_tags") or []), summary=(kw.get("summary") or "").strip(), headnote=(kw.get("headnote") or "").strip(), source_type=kw.get("source_type", ""), precedent_level=kw.get("precedent_level", ""), is_binding=kw.get("is_binding", True), document_id=kw.get("document_id"), ) _EXTERNAL_SPEC = ingest.IntakeSpec( source_kind="external_upload", id_field="citation", staging_root=PRECEDENT_LIBRARY_DIR, staging_subdir=_external_staging_subdir, validate=_external_validate, enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES}, derive=lambda inputs: {}, display_name_fallback="citation", create_record=_create_external_record, ) async def ingest_precedent( *, file_path: str | Path, citation: str, case_name: str = "", court: str = "", decision_date=None, source_type: str = "", precedent_level: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, is_binding: bool = True, headnote: str = "", summary: str = "", document_id: UUID | None = None, progress: ingest.ProgressCb | None = None, ) -> dict: """Ingest one external precedent. Thin wrapper over the canonical pipeline.""" inputs = { "citation": citation, "case_name": case_name, "court": court, "decision_date": decision_date, "source_type": source_type, "precedent_level": precedent_level, "practice_area": practice_area, "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, "is_binding": is_binding, "headnote": headnote, "summary": summary, } return await ingest.ingest_document( _EXTERNAL_SPEC, inputs=inputs, file_path=file_path, document_id=document_id, progress=progress, ) async def reextract_halachot( case_law_id: UUID | str, progress: ProgressCb | None = None, ) -> dict: """Re-run the halacha extractor on an existing precedent. Idempotent. Chair-approved / published halachot are PRESERVED across the re-extract (INV-G10) — only un-reviewed rows are replaced. See ``db.reset_halacha_extraction`` / TaskMaster #108. **MCP-tool-only path.** This function calls into ``halacha_extractor``, which calls ``claude_session`` — the local CLI is required. Invoking this from the FastAPI container will raise ``Claude CLI not found``. See the architectural rule in ``services/claude_session.py``. """ from legal_mcp.services import halacha_extractor progress = progress or _noop_progress if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: raise ValueError("precedent not found") # Was restricted to source_kind='external_upload'; opened 2026-05-06 so # internal_committee rows can also be re-extracted when ingest produced # bad data. See note in db.request_metadata_extraction. await progress("extracting_halachot", 50, "מחלץ הלכות מחדש") # Explicit re-extraction = clean slate (force): drop un-reviewed halachot + # clear per-chunk checkpoints and redo all, but PRESERVE chair-approved / # published rows (INV-G10; dedup-on-insert avoids duplicating them). (Queue # draining / resume uses force=False so an interrupted run continues.) result = await halacha_extractor.extract(case_law_id, force=True) # Clear the queue timestamp on completion so the UI badge / worker queue # don't keep showing this row. The queue worker (process_pending_extractions) # already does this; mirror it here so per-record extraction drains too. if result.get("status") in ("completed", "no_halachot"): await db.clear_extraction_request(case_law_id, kind="halacha") await progress( "completed", 100, f"הופקו {result.get('stored', 0)} הלכות (ממתינות לאישור)", ) return result # Wait this many seconds between precedents in a multi-precedent run. # Anthropic rate-limits across the org, so back-to-back extractions of large # rulings (e.g. 129 chunks for one, then 79 for another) can spill the second # precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9 # halachot, 317/10 immediately after returned silent no_halachot. INTER_PRECEDENT_COOLDOWN_SEC = 30 # Metadata extraction is on Gemini (fast, high rate limits) — a brief spacer is # enough; the 30s above is for the Claude-backed halacha path. METADATA_COOLDOWN_SEC = float(os.environ.get("METADATA_COOLDOWN_SEC", "2")) # How many times to retry a precedent that came back as 'extraction_failed' # (i.e. >50% chunks crashed). Each retry uses a longer cooldown. PRECEDENT_RETRY_ATTEMPTS = 1 PRECEDENT_RETRY_COOLDOWN_SEC = 60 async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -> dict: """Drain the extraction queue (UI-button-stamped requests). The button in the web UI cannot run claude_session itself (it lives in the container, no CLI). It just stamps ``metadata_extraction_requested_at`` on the row. This function — called from local Claude Code via the MCP tool — picks each stamped row up, runs the extractor, and clears the timestamp. Sequencing: precedents are processed serially (never in parallel) and each is followed by a short cooldown so the Anthropic rate-limit counter has time to drain before the next big precedent starts. If halacha extraction comes back as ``extraction_failed`` we retry the same precedent once with a longer cooldown — matching the empirical pattern where the second precedent in a back-to-back run gets rate-limited but recovers after a brief pause. Args: kind: 'metadata' or 'halacha'. limit: max rows to process this run. """ from legal_mcp.services import halacha_extractor, precedent_metadata_extractor if kind not in {"metadata", "halacha"}: raise ValueError("kind must be 'metadata' or 'halacha'") # Self-heal stale 'processing' rows (fully unattended): a drain that crashed # mid-extraction can leave a row status='processing' with its requested_at # cleared — orphaned, so it would never be re-picked. Re-stamp it so it # re-drains (the halacha extractor uses force=False → resumes from chunk # checkpoints, no duplicates). Safe under the global advisory lock (only one # drain runs at a time). Mirrors the digests-drain self-heal. healed = await db.requeue_stale_processing_extractions(kind=kind) if healed: logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind) pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) if not pending: return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} async def _run_once(cid: UUID) -> dict: if kind == "metadata": return await precedent_metadata_extractor.extract_and_apply(cid) # Bulk queue-drain → lighter effort (config.HALACHA_BULK_EXTRACT_EFFORT, # default 'high') to cut wall-clock at scale. Resume (force=False) so an # interrupted drain continues per-chunk. Single re-extract stays xhigh. return await halacha_extractor.extract( cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT, ) # Metadata extraction runs on Gemini (high rate limits, fast) — the long # cooldown is only needed for halacha (Claude/Anthropic rate limits). cooldown = METADATA_COOLDOWN_SEC if kind == "metadata" else INTER_PRECEDENT_COOLDOWN_SEC results: list[dict] = [] processed = 0 for idx, row in enumerate(pending): if idx > 0: await asyncio.sleep(cooldown) cid = UUID(str(row["id"])) attempts = 0 result: dict = {} try: # Flip to 'processing' so the UI badge shows live progress while # this row is being worked (metadata has no per-chunk status of # its own — this is the only signal). Halacha already sets its own # 'processing' inside the extractor. if kind == "metadata": await db.set_case_law_metadata_status(cid, "processing") result = await _run_once(cid) # Retry only on systematic extraction failure (rate-limit storm). # Don't retry on 'no_halachot' — that means Claude looked and # genuinely found nothing. while ( result.get("status") == "extraction_failed" and attempts < PRECEDENT_RETRY_ATTEMPTS ): attempts += 1 logger.warning( "process_pending_extractions: %s returned extraction_failed " "(%d/%d chunks crashed), retry %d/%d after %ds cooldown", cid, result.get("failed_chunks", 0), result.get("total_chunks", 0), attempts, PRECEDENT_RETRY_ATTEMPTS, PRECEDENT_RETRY_COOLDOWN_SEC, ) await asyncio.sleep(PRECEDENT_RETRY_COOLDOWN_SEC) result = await _run_once(cid) # Finalise: success or terminal failure both clear the request # so the queue moves on. (Use 'failed' DB state for terminal # extraction_failed so the UI shows the warning chip.) if kind == "halacha": if result.get("status") == "extraction_failed": await db.set_case_law_halacha_status(cid, "failed") await db.clear_extraction_request(cid, kind=kind) else: # metadata — set terminal 'completed' status (also clears the # request timestamp) so the UI badge settles instead of # lingering on 'processing'. await db.set_case_law_metadata_status(cid, "completed") processed += 1 results.append({ "case_law_id": str(cid), "case_number": row.get("case_number", ""), "status": result.get("status", "unknown"), "fields": result.get("fields", []), "stored": result.get("stored", 0), "retry_attempts": attempts, }) except Exception as e: logger.exception("process_pending_extractions failed for %s: %s", cid, e) # Don't clear the request — it stays for the next run. But for # metadata, revert the badge from 'processing' back to 'pending' # (the timestamp is preserved) so the row shows "בתור" rather than # a stuck "מחלץ" until the retry picks it up. if kind == "metadata": try: await db.set_case_law_metadata_status(cid, "pending") except Exception: logger.exception("failed to revert metadata status for %s", cid) results.append({ "case_law_id": str(cid), "case_number": row.get("case_number", ""), "status": "failed", "error": str(e), "retry_attempts": attempts, }) return { "status": "completed", "kind": kind, "processed": processed, "total_pending": len(pending), "results": results, } async def reextract_metadata( case_law_id: UUID | str, progress: ProgressCb | None = None, ) -> dict: """Re-run metadata extraction on an existing precedent. Only fills empty fields (subject_tags, summary, headnote, key_quote, appeal_subtype, and case_name when it equals the citation). User values are preserved. **MCP-tool-only path** — same constraint as :func:`reextract_halachot`. """ from legal_mcp.services import precedent_metadata_extractor progress = progress or _noop_progress if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: raise ValueError("precedent not found") # See note in db.request_metadata_extraction — opened to all source kinds. # Mark 'processing' so a concurrent UI poll shows the live badge. await db.set_case_law_metadata_status(case_law_id, "processing") await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (תקציר, תגיות)") result = await precedent_metadata_extractor.extract_and_apply(case_law_id) # Settle to terminal 'completed' (also NULLs the queue timestamp) so the # UI / worker stop showing this row. See note in reextract_halachot. if result.get("status") in ("completed", "no_changes"): await db.set_case_law_metadata_status(case_law_id, "completed") else: # e.g. 'no_metadata' (no full_text) — don't leave the badge stuck on # 'processing'; revert to 'pending' (preserves any queue timestamp). await db.set_case_law_metadata_status(case_law_id, "pending") fields = result.get("fields") or [] msg = ( f"מולאו {len(fields)} שדות: {', '.join(fields)}" if fields else "לא נמצא מה למלא (כל השדות מאוכלסים או לא ניתן לחלץ)" ) await progress("completed", 100, msg) return result async def delete_precedent(case_law_id: UUID | str) -> bool: """Delete a precedent and cascade chunks + halachot.""" if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) return await db.delete_case_law(case_law_id) async def get_precedent(case_law_id: UUID | str) -> dict | None: """Get a precedent with its halachot and related cases attached.""" if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) record = await db.get_case_law(case_law_id) if not record: return None record["halachot"] = await db.list_halachot(case_law_id=case_law_id, limit=500) record["related_cases"] = await db.get_case_law_relations(case_law_id) return record async def list_precedents( practice_area: str = "", court: str = "", precedent_level: str = "", source_type: str = "", search: str = "", source_kind: str = "external_upload", limit: int = 100, offset: int = 0, ) -> list[dict]: return await db.list_external_case_law( practice_area=practice_area, court=court, precedent_level=precedent_level, source_type=source_type, search=search, source_kind=source_kind, limit=limit, offset=offset, ) async def search_library( query: str, practice_area: str = "", court: str = "", precedent_level: str = "", appeal_subtype: str = "", is_binding: bool | None = None, subject_tag: str = "", limit: int = 10, include_halachot: bool = True, ) -> list[dict]: """Semantic search merging halachot (rule-level) and chunks (passage-level). Only ``approved`` / ``published`` halachot are returned, per chair-review policy. Chunks are returned regardless of halacha review status. When ``VOYAGE_RERANK_ENABLED`` is set, results are passed through voyage rerank-2 (cross-encoder). The +0.05 halacha boost from ``search_precedent_library_semantic`` is preserved before rerank but the rerank scores ultimately decide the order. """ if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await hybrid_search.search_precedent_library_hybrid( query=query, query_text_embedding=query_vec, limit=limit, practice_area=practice_area, court=court, precedent_level=precedent_level, appeal_subtype=appeal_subtype, is_binding=is_binding, subject_tag=subject_tag, include_halachot=include_halachot, )