refactor(ingest): ingest_precedent delegates to canonical pipeline (FU-1)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 19:16:29 +00:00
parent be4f7bbe99
commit d7eb1b2824

View File

@@ -15,15 +15,12 @@ from __future__ import annotations
import asyncio
import logging
import re
import shutil
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, rerank # noqa: F401
from legal_mcp.services import chunker, db, embeddings, extractor, hybrid_search, ingest, rerank # noqa: F401
# Note: halacha_extractor and precedent_metadata_extractor are NOT imported
# at module load. They are imported lazily inside the dedicated re-extract
@@ -40,8 +37,8 @@ ProgressCb = Callable[[str, int, str], Awaitable[None]]
PRECEDENT_LIBRARY_DIR = Path(config.DATA_DIR) / "precedent-library"
_VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_197"}
_VALID_SOURCE_TYPES = {"", "court_ruling", "appeals_committee"}
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_SOURCE_TYPES = frozenset({"", "court_ruling", "appeals_committee"})
_VALID_PRECEDENT_LEVELS = {
"", "עליון", "מנהלי", "ועדת_ערר_ארצית", "ועדת_ערר_מחוזית",
"supreme", "administrative", "national_appeals_committee", "district_appeals_committee",
@@ -52,37 +49,54 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
def _safe_filename(name: str) -> str:
"""Strip path separators and unsafe chars from a user-provided name."""
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
def _external_validate(inputs: dict) -> None:
citation = (inputs.get("citation") or "").strip()
if not citation:
raise ValueError("citation is required")
if citation.startswith(("ערר ", "ערר(", 'בל"מ ', 'בל"מ(', "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
def _stage_file(src_path: Path, source_type: str) -> Path:
"""Copy the uploaded file into data/precedent-library/<source_type>/.
Returns the destination path. Source file is not deleted (caller decides).
"""
sub = source_type if source_type in {"court_ruling", "appeals_committee"} else "other"
dest_dir = PRECEDENT_LIBRARY_DIR / sub
dest_dir.mkdir(parents=True, exist_ok=True)
safe_name = _safe_filename(src_path.name)
dest = dest_dir / f"{uuid4().hex[:8]}_{safe_name}"
shutil.copy2(src_path, dest)
return dest
def _external_staging_subdir(inputs: dict) -> str:
st = inputs.get("source_type") or ""
return st if st in {"court_ruling", "appeals_committee"} else "other"
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
async def _create_external_record(**kw) -> dict:
"""Adapter: maps canonical inputs (citation) to create_external_case_law(case_number)."""
return await db.create_external_case_law(
case_number=kw["citation"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
headnote=(kw.get("headnote") or "").strip(),
source_type=kw.get("source_type", ""),
precedent_level=kw.get("precedent_level", ""),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
)
_EXTERNAL_SPEC = ingest.IntakeSpec(
source_kind="external_upload",
id_field="citation",
staging_root=PRECEDENT_LIBRARY_DIR,
staging_subdir=_external_staging_subdir,
validate=_external_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "source_type": _VALID_SOURCE_TYPES},
derive=lambda inputs: {},
display_name_fallback="citation",
create_record=_create_external_record,
)
async def ingest_precedent(
@@ -101,220 +115,20 @@ async def ingest_precedent(
headnote: str = "",
summary: str = "",
document_id: UUID | None = None,
progress: ProgressCb | None = None,
progress: ingest.ProgressCb | None = None,
) -> dict:
"""Ingest a single uploaded precedent through the full pipeline.
Required: file_path + citation. Everything else has a sensible default.
Returns:
``{"status": "...", "case_law_id": "...", "chunks": N, "halachot": M}``
"""
progress = progress or _noop_progress
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
if not citation.strip():
raise ValueError("citation is required")
# Citation guard at service level (catches both MCP and HTTP API paths).
# Appeals-committee decisions must go through ingest_internal_decision
# which records chair_name+district. The MCP wrapper has the same guard
# for an earlier, friendlier error message — but this is the source of
# truth. See TaskMaster #30(ב) and DB constraint case_law_external_arar_check.
_norm = citation.strip()
if _norm.startswith(("ערר ", "ערר(", "בל\"מ ", "בל\"מ(", "ARAR ")):
raise ValueError(
"ציטוט שמתחיל ב-'ערר' או 'בל\"מ' הוא החלטת ועדת ערר. "
"השתמש ב-internal_decision_upload (דורש chair_name + district), "
"לא ב-precedent_library_upload."
)
if practice_area not in _VALID_PRACTICE_AREAS:
raise ValueError(f"invalid practice_area: {practice_area!r}")
if source_type not in _VALID_SOURCE_TYPES:
raise ValueError(f"invalid source_type: {source_type!r}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, source_type)
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
text = (text or "").strip()
if not text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Strip any Nevo preamble that might wrap court rulings downloaded from Nevo.
text = extractor.strip_nevo_preamble(text)
await progress("storing_metadata", 25, "שומר את הפסיקה במסד הנתונים")
record = await db.create_external_case_law(
case_number=citation.strip(),
case_name=case_name.strip() or citation.strip(),
full_text=text,
court=court.strip(),
decision_date=_coerce_date(decision_date),
practice_area=practice_area,
appeal_subtype=appeal_subtype.strip(),
subject_tags=list(subject_tags or []),
summary=summary.strip(),
headnote=headnote.strip(),
source_type=source_type,
precedent_level=precedent_level,
is_binding=is_binding,
document_id=document_id,
"""Ingest one external precedent. Thin wrapper over the canonical pipeline."""
inputs = {
"citation": citation, "case_name": case_name, "court": court,
"decision_date": decision_date, "source_type": source_type,
"precedent_level": precedent_level, "practice_area": practice_area,
"appeal_subtype": appeal_subtype, "subject_tags": subject_tags,
"is_binding": is_binding, "headnote": headnote, "summary": summary,
}
return await ingest.ingest_document(
_EXTERNAL_SPEC, inputs=inputs, file_path=file_path,
document_id=document_id, progress=progress,
)
case_law_id = UUID(str(record["id"]))
try:
# Parent-doc retrieval (TaskMaster #48): when enabled, emit
# two tiers (parents + children). Only children are embedded
# and indexed; parents carry retrieval context. When disabled,
# fall back to legacy single-tier chunking — identical
# behaviour to pre-V17.
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress(
"chunking", 40,
f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')",
)
h_chunks = chunker.chunk_document_hierarchical(
text, page_offsets=page_offsets,
)
if not h_chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
await progress("completed", 100, "אין טקסט לעיבוד")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": 0,
"halachot": 0,
}
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress(
"embedding", 55,
f"מייצר embeddings ל-{len(children)} children "
f"({len(parents)} parents)",
)
child_texts = [c.content for c in children]
child_vectors = await embeddings.embed_texts(
child_texts, input_type="document",
)
# Build flat dict list for the two-pass writer.
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent",
"local_id": p.local_id,
"parent_local_id": None,
"chunk_index": p.chunk_index,
"content": p.content,
"section_type": p.section_type,
"page_number": p.page_number,
"embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child",
"local_id": c.local_id,
"parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(
case_law_id, chunk_dicts,
)
stored_chunks = counts["children"]
else:
await progress(
"chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')",
)
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
await progress("completed", 100, "אין טקסט לעיבוד")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": 0,
"halachot": 0,
}
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_texts = [c.content for c in chunks]
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
chunk_dicts = [
{
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
}
for c, v in zip(chunks, chunk_vectors)
]
stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts)
# Multimodal page-image embeddings (V9). Gated by feature flag.
# Non-fatal: text path already succeeded. Only PDFs.
if config.MULTIMODAL_ENABLED and page_count > 0 and staged.suffix.lower() == ".pdf":
try:
await progress(
"embedding_images", 70,
f"מטמיע {page_count} עמודי תמונה (multimodal)",
)
await _embed_precedent_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Precedent multimodal embedding failed (non-fatal): %s", e)
# Pipeline split: the container does the non-LLM half (extract +
# chunk + embed + store). LLM-driven extraction (metadata, halachot)
# runs separately via the MCP tool `precedent_process_pending` from
# local Claude Code, where `claude` CLI is available.
#
# We auto-queue both extractions so the chair doesn't need to click
# any button — the moment they (or me) run `precedent_process_pending`
# in chat, both kinds get processed.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress(
"completed",
100,
f"הוכנס לספרייה: {stored_chunks} chunks. "
f"חילוץ הלכות ומטא-דאטה ממתינים בתור — "
f"להפעיל מ-Claude Code: precedent_process_pending.",
)
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("precedent_library.ingest_precedent failed: %s", e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def reextract_halachot(
@@ -586,48 +400,3 @@ async def search_library(
subject_tag=subject_tag,
include_halachot=include_halachot,
)
async def _embed_precedent_pages(
case_law_id: UUID,
pdf_path: Path,
page_count: int,
) -> dict:
"""Render precedent PDF pages → embed via voyage-multimodal → store.
Thumbnails go to
``data/precedent-library/thumbnails/{case_law_id}/p{N:03d}.jpg``.
"""
thumb_dir = PRECEDENT_LIBRARY_DIR / "thumbnails" / str(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path,
config.MULTIMODAL_DPI,
config.MULTIMODAL_THUMB_DPI,
thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1,
"embedding": emb,
"image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info(
"Multimodal: stored %d page-image embeddings for case_law %s",
stored, case_law_id,
)
return {"pages_embedded": stored}