Files
legal-ai/mcp-server/src/legal_mcp/services/precedent_library.py
Chaim d95a36f310 feat(extraction): precedent metadata via Gemini Flash + scheduled drainer
The /precedents metadata queue was stuck — 24 rows requested, nothing draining
them — and the agentic claude CLI hit error_max_turns on what is a single
structured text→JSON task (slow + flaky). Metadata extraction is bounded
extraction, the wrong fit for an agentic loop.

- gemini_session.py: query_json drop-in (gemini-2.5-flash, JSON mode, httpx —
  no new SDK dep). Reads GEMINI_API_KEY (~/.env; SoT Infisical
  nautilus:/external-apis/gemini). Host-side only — no LLM from the container.
- precedent_metadata_extractor: claude_session.query_json → gemini_session.
  Validated live: rich, accurate fields (case_name/summary/appeal_subtype/tags).
- process_pending_extractions: kind-aware cooldown — metadata 2s (Gemini, fast),
  halacha keeps 30s (Claude rate limits).
- drain_metadata_queue.py + legal-metadata-drain.config.cjs (pm2 cron */15) so
  the queue never clogs again. SCRIPTS.md.
- X8 INV-FP5 updated: per-task engine choice (Gemini=bounded metadata,
  claude_session=agentic halacha), both host-side, single canonical queue (G2).

Agentic/voice-sensitive work (writing, analysis, halacha) stays on claude_session
(Daphna's subscription). Gemini cost ≈ $0.10/1M tokens — negligible.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 05:13:49 +00:00

444 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Orchestrator for the External Precedent Library.
Ingest pipeline (one upload):
file → extract_text → proofread → INSERT case_law (source_kind='external_upload')
→ chunk → embed → store precedent_chunks
→ halacha_extractor.extract → embed halachot → store halachot
→ set extraction_status='completed'
Progress is reported via a caller-supplied async callback so the
web layer can pipe updates into the existing Redis ProgressStore /
SSE plumbing without this module knowing about Redis.
"""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, embeddings, hybrid_search, ingest # noqa: F401
# Note: halacha_extractor and precedent_metadata_extractor are NOT imported
# at module load. They are imported lazily inside the dedicated re-extract
# entry points so that `ingest_precedent` (called from the FastAPI container,
# where `claude` CLI is unavailable) cannot accidentally pull them in. See
# the architectural rule in services/claude_session.py.
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
_VALID_PRECEDENT_LEVELS = {
"", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית",
"supreme", "administrative", "national_appeals_committee", "district_appeals_committee",
}
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
def _external_validate(inputs: dict) -> None:
citation = (inputs.get("citation") or "").strip()
if not citation:
raise ValueError("citation is required")
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
def _external_staging_subdir(inputs: dict) -> str:
st = inputs.get("source_type") or ""
return st if st in {"court_ruling", "appeals_committee"} else "other"
async def _create_external_record(**kw) -> dict:
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
return await db.create_external_case_law(
case_number=kw["citation"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
headnote=(kw.get("headnote") or "").strip(),
source_type=kw.get("source_type", ""),
precedent_level=kw.get("precedent_level", ""),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
)
_EXTERNAL_SPEC = ingest.IntakeSpec(
source_kind="external_upload",
id_field="citation",
staging_root=PRECEDENT_LIBRARY_DIR,
staging_subdir=_external_staging_subdir,
validate=_external_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
derive=lambda inputs: {},
display_name_fallback="citation",
create_record=_create_external_record,
)
async def ingest_precedent(
*,
file_path: str | Path,
citation: str,
case_name: str = "",
court: str = "",
decision_date=None,
source_type: str = "",
precedent_level: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
is_binding: bool = True,
headnote: str = "",
summary: str = "",
document_id: UUID | None = None,
progress: ingest.ProgressCb | None = None,
) -> dict:
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
inputs = {
"citation": citation, "case_name": case_name, "court": court,
"decision_date": decision_date, "source_type": source_type,
"precedent_level": precedent_level, "practice_area": practice_area,
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
"is_binding": is_binding, "headnote": headnote, "summary": summary,
}
return await ingest.ingest_document(
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
document_id=document_id, progress=progress,
)
async def reextract_halachot(
case_law_id: UUID | str,
progress: ProgressCb | None = None,
) -> dict:
"""Re-run the halacha extractor on an existing precedent. Idempotent.
**MCP-tool-only path.** This function calls into ``halacha_extractor``,
which calls ``claude_session`` — the local CLI is required. Invoking
this from the FastAPI container will raise ``Claude CLI not found``.
See the architectural rule in ``services/claude_session.py``.
"""
from legal_mcp.services import halacha_extractor
progress = progress or _noop_progress
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
record = await db.get_case_law(case_law_id)
if not record:
raise ValueError("precedent not found")
# Was restricted to source_kind='external_upload'; opened 2026-05-06 so
# internal_committee rows can also be re-extracted when ingest produced
# bad data. See note in db.request_metadata_extraction.
await progress("extracting_halachot", 50, "מחלץ הלכות מחדש")
# Explicit re-extraction = clean slate (force): wipe prior halachot +
# per-chunk checkpoints and redo all. (Queue draining / resume uses the
# default force=False so an interrupted run continues where it stopped.)
result = await halacha_extractor.extract(case_law_id, force=True)
# Clear the queue timestamp on completion so the UI badge / worker queue
# don't keep showing this row. The queue worker (process_pending_extractions)
# already does this; mirror it here so per-record extraction drains too.
if result.get("status") in ("completed", "no_halachot"):
await db.clear_extraction_request(case_law_id, kind="halacha")
await progress(
"completed",
100,
f"הופקו {result.get('stored', 0)} הלכות (ממתינות לאישור)",
)
return result
# Wait this many seconds between precedents in a multi-precedent run.
# Anthropic rate-limits across the org, so back-to-back extractions of large
# rulings (e.g. 129 chunks for one, then 79 for another) can spill the second
# precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9
# halachot, 317/10 immediately after returned silent no_halachot.
INTER_PRECEDENT_COOLDOWN_SEC = 30
# Metadata extraction is on Gemini (fast, high rate limits) — a brief spacer is
# enough; the 30s above is for the Claude-backed halacha path.
METADATA_COOLDOWN_SEC = float(os.environ.get("METADATA_COOLDOWN_SEC", "2"))
# How many times to retry a precedent that came back as 'extraction_failed'
# (i.e. >50% chunks crashed). Each retry uses a longer cooldown.
PRECEDENT_RETRY_ATTEMPTS = 1
PRECEDENT_RETRY_COOLDOWN_SEC = 60
async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -> dict:
"""Drain the extraction queue (UI-button-stamped requests).
The button in the web UI cannot run claude_session itself (it lives in
the container, no CLI). It just stamps ``metadata_extraction_requested_at``
on the row. This function — called from local Claude Code via the MCP
tool — picks each stamped row up, runs the extractor, and clears the
timestamp.
Sequencing: precedents are processed serially (never in parallel) and
each is followed by a short cooldown so the Anthropic rate-limit
counter has time to drain before the next big precedent starts. If
halacha extraction comes back as ``extraction_failed`` we retry the
same precedent once with a longer cooldown — matching the empirical
pattern where the second precedent in a back-to-back run gets
rate-limited but recovers after a brief pause.
Args:
kind: 'metadata' or 'halacha'.
limit: max rows to process this run.
"""
from legal_mcp.services import halacha_extractor, precedent_metadata_extractor
if kind not in {"metadata", "halacha"}:
raise ValueError("kind must be 'metadata' or 'halacha'")
pending = await db.list_pending_extraction_requests(kind=kind, limit=limit)
if not pending:
return {"status": "no_pending", "kind": kind, "processed": 0, "results": []}
async def _run_once(cid: UUID) -> dict:
if kind == "metadata":
return await precedent_metadata_extractor.extract_and_apply(cid)
# Bulk queue-drain → lighter effort (config.HALACHA_BULK_EXTRACT_EFFORT,
# default 'high') to cut wall-clock at scale. Resume (force=False) so an
# interrupted drain continues per-chunk. Single re-extract stays xhigh.
return await halacha_extractor.extract(
cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT,
)
# Metadata extraction runs on Gemini (high rate limits, fast) — the long
# cooldown is only needed for halacha (Claude/Anthropic rate limits).
cooldown = METADATA_COOLDOWN_SEC if kind == "metadata" else INTER_PRECEDENT_COOLDOWN_SEC
results: list[dict] = []
processed = 0
for idx, row in enumerate(pending):
if idx > 0:
await asyncio.sleep(cooldown)
cid = UUID(str(row["id"]))
attempts = 0
result: dict = {}
try:
# Flip to 'processing' so the UI badge shows live progress while
# this row is being worked (metadata has no per-chunk status of
# its own — this is the only signal). Halacha already sets its own
# 'processing' inside the extractor.
if kind == "metadata":
await db.set_case_law_metadata_status(cid, "processing")
result = await _run_once(cid)
# Retry only on systematic extraction failure (rate-limit storm).
# Don't retry on 'no_halachot' — that means Claude looked and
# genuinely found nothing.
while (
result.get("status") == "extraction_failed"
and attempts < PRECEDENT_RETRY_ATTEMPTS
):
attempts += 1
logger.warning(
"process_pending_extractions: %s returned extraction_failed "
"(%d/%d chunks crashed), retry %d/%d after %ds cooldown",
cid,
result.get("failed_chunks", 0),
result.get("total_chunks", 0),
attempts, PRECEDENT_RETRY_ATTEMPTS,
PRECEDENT_RETRY_COOLDOWN_SEC,
)
await asyncio.sleep(PRECEDENT_RETRY_COOLDOWN_SEC)
result = await _run_once(cid)
# Finalise: success or terminal failure both clear the request
# so the queue moves on. (Use 'failed' DB state for terminal
# extraction_failed so the UI shows the warning chip.)
if kind == "halacha":
if result.get("status") == "extraction_failed":
await db.set_case_law_halacha_status(cid, "failed")
await db.clear_extraction_request(cid, kind=kind)
else:
# metadata — set terminal 'completed' status (also clears the
# request timestamp) so the UI badge settles instead of
# lingering on 'processing'.
await db.set_case_law_metadata_status(cid, "completed")
processed += 1
results.append({
"case_law_id": str(cid),
"case_number": row.get("case_number", ""),
"status": result.get("status", "unknown"),
"fields": result.get("fields", []),
"stored": result.get("stored", 0),
"retry_attempts": attempts,
})
except Exception as e:
logger.exception("process_pending_extractions failed for %s: %s", cid, e)
# Don't clear the request — it stays for the next run. But for
# metadata, revert the badge from 'processing' back to 'pending'
# (the timestamp is preserved) so the row shows "בתור" rather than
# a stuck "מחלץ" until the retry picks it up.
if kind == "metadata":
try:
await db.set_case_law_metadata_status(cid, "pending")
except Exception:
logger.exception("failed to revert metadata status for %s", cid)
results.append({
"case_law_id": str(cid),
"case_number": row.get("case_number", ""),
"status": "failed",
"error": str(e),
"retry_attempts": attempts,
})
return {
"status": "completed",
"kind": kind,
"processed": processed,
"total_pending": len(pending),
"results": results,
}
async def reextract_metadata(
case_law_id: UUID | str,
progress: ProgressCb | None = None,
) -> dict:
"""Re-run metadata extraction on an existing precedent.
Only fills empty fields (subject_tags, summary, headnote, key_quote,
appeal_subtype, and case_name when it equals the citation). User
values are preserved.
**MCP-tool-only path** — same constraint as :func:`reextract_halachot`.
"""
from legal_mcp.services import precedent_metadata_extractor
progress = progress or _noop_progress
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
record = await db.get_case_law(case_law_id)
if not record:
raise ValueError("precedent not found")
# See note in db.request_metadata_extraction — opened to all source kinds.
# Mark 'processing' so a concurrent UI poll shows the live badge.
await db.set_case_law_metadata_status(case_law_id, "processing")
await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (תקציר, תגיות)")
result = await precedent_metadata_extractor.extract_and_apply(case_law_id)
# Settle to terminal 'completed' (also NULLs the queue timestamp) so the
# UI / worker stop showing this row. See note in reextract_halachot.
if result.get("status") in ("completed", "no_changes"):
await db.set_case_law_metadata_status(case_law_id, "completed")
else:
# e.g. 'no_metadata' (no full_text) — don't leave the badge stuck on
# 'processing'; revert to 'pending' (preserves any queue timestamp).
await db.set_case_law_metadata_status(case_law_id, "pending")
fields = result.get("fields") or []
msg = (
f"מולאו {len(fields)} שדות: {', '.join(fields)}"
if fields
else "לא נמצא מה למלא (כל השדות מאוכלסים או לא ניתן לחלץ)"
)
await progress("completed", 100, msg)
return result
async def delete_precedent(case_law_id: UUID | str) -> bool:
"""Delete a precedent and cascade chunks + halachot."""
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
return await db.delete_case_law(case_law_id)
async def get_precedent(case_law_id: UUID | str) -> dict | None:
"""Get a precedent with its halachot and related cases attached."""
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
record = await db.get_case_law(case_law_id)
if not record:
return None
record["halachot"] = await db.list_halachot(case_law_id=case_law_id, limit=500)
record["related_cases"] = await db.get_case_law_relations(case_law_id)
return record
async def list_precedents(
practice_area: str = "",
court: str = "",
precedent_level: str = "",
source_type: str = "",
search: str = "",
source_kind: str = "external_upload",
limit: int = 100,
offset: int = 0,
) -> list[dict]:
return await db.list_external_case_law(
practice_area=practice_area,
court=court,
precedent_level=precedent_level,
source_type=source_type,
search=search,
source_kind=source_kind,
limit=limit,
offset=offset,
)
async def search_library(
query: str,
practice_area: str = "",
court: str = "",
precedent_level: str = "",
appeal_subtype: str = "",
is_binding: bool | None = None,
subject_tag: str = "",
limit: int = 10,
include_halachot: bool = True,
) -> list[dict]:
"""Semantic search merging halachot (rule-level) and chunks (passage-level).
Only ``approved`` / ``published`` halachot are returned, per chair-review
policy. Chunks are returned regardless of halacha review status.
When ``VOYAGE_RERANK_ENABLED`` is set, results are passed through
voyage rerank-2 (cross-encoder). The +0.05 halacha boost from
``search_precedent_library_semantic`` is preserved before rerank
but the rerank scores ultimately decide the order.
"""
if not query.strip():
return []
query_vec = await embeddings.embed_query(query)
return await hybrid_search.search_precedent_library_hybrid(
query=query,
query_text_embedding=query_vec,
limit=limit,
practice_area=practice_area,
court=court,
precedent_level=precedent_level,
appeal_subtype=appeal_subtype,
is_binding=is_binding,
subject_tag=subject_tag,
include_halachot=include_halachot,
)