Files
legal-ai/mcp-server/src/legal_mcp/services/ingest.py
Chaim 1986fe3b14 feat(storage): X14 Phase 2a — route source-document writes through storage.py
Rewire the source-document staging writes onto the unified storage layer
(INV-STG1), replacing direct shutil.copy2 calls:
- tools/documents.py: case originals + training-corpus uploads
- services/ingest.py: _stage_file (now async) — covers precedent-library,
  internal-decisions, and digests (the canonical intake helper)
- services/digest_library.py: awaits the now-async _stage_file

Each write goes through storage.put_file(..., bucket=DOCUMENTS) with the
DATA_DIR-relative key; the Hebrew original filename rides as object metadata
(INV-STG2), content-type is guessed from the extension. DB path columns are
unchanged (still the absolute dest) — object_key backfill is Phase 3.

Under the default STORAGE_BACKEND=filesystem the bytes land at the exact
legacy on-disk location (put_file → shutil.copy2 to DATA_DIR/key), so this
is zero behaviour change in prod. shutil import dropped where now unused.

tests: +2 staging regression tests (file lands under DATA_DIR at the legacy
path); 20 storage + 22 ingest tests green; 242 collected with no import
breakage.

Derived/export write sites (thumbnails, extracted text, DOCX exports) are
Phase 2b. Keeps G2; advances INV-STG1. Spec: docs/spec/X14-storage-minio.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 08:00:27 +00:00

305 lines
13 KiB
Python

"""Canonical ingest pipeline (FU-1).
One pipeline for all sibling-entity intake types (external precedent,
internal committee decision). Per-type variation rides on an ``IntakeSpec``
config object — never a parallel function. See
docs/spec/01-ingest.md and docs/superpowers/specs/2026-05-30-fu1-unified-ingest-design.md.
claude_session rule preserved: this module only QUEUES extraction
(``request_*_extraction`` = pure DB writes). It never imports
halacha_extractor / precedent_metadata_extractor, so it is safe to call
from the FastAPI container where the ``claude`` CLI is unavailable.
"""
from __future__ import annotations
import asyncio
import logging
import mimetypes
import re
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Awaitable, Callable
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, storage
logger = logging.getLogger(__name__)
ProgressCb = Callable[[str, int, str], Awaitable[None]]
async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
@dataclass(frozen=True)
class IntakeSpec:
"""Describes everything that varies between intake types."""
source_kind: str
id_field: str
staging_root: Path
staging_subdir: Callable[[dict], str]
validate: Callable[[dict], None]
enum_fields: dict[str, frozenset[str]]
derive: Callable[[dict], dict]
display_name_fallback: str
create_record: Callable[..., Awaitable[dict]]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"upload-{uuid4().hex[:8]}"
async def _stage_file(src_path: Path, root: Path, subdir: str) -> Path:
"""Stage an intake file through the unified storage layer (INV-STG1).
Returns the DATA_DIR path the rest of the pipeline reads from — under the
filesystem/dual backends the bytes are on disk and the key is the
DATA_DIR-relative path. The Hebrew original filename rides as object
metadata, never as the key (INV-STG2)."""
dest = root / (subdir or "other") / f"{uuid4().hex[:8]}_{_safe_filename(src_path.name)}"
key = dest.relative_to(config.DATA_DIR).as_posix()
await storage.put_file(
src_path, key, bucket=storage.Bucket.DOCUMENTS,
content_type=mimetypes.guess_type(src_path.name)[0],
metadata={"filename": src_path.name},
)
return dest
def _validate_enums(spec: IntakeSpec, inputs: dict) -> None:
for field_name, allowed in spec.enum_fields.items():
value = inputs.get(field_name, "") or ""
if value not in allowed:
raise ValueError(f"invalid {field_name}: {value!r}")
async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> dict:
"""Render PDF pages → embed via voyage-multimodal → store. Non-fatal caller."""
thumb_dir = spec_thumb_dir(case_law_id)
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
pdf_path, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, thumb_dir,
)
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1, "embedding": emb, "image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
logger.info("Multimodal: stored %d page-image embeddings for case_law %s", stored, case_law_id)
return {"pages_embedded": stored}
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = await _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = (raw_text or "")
else:
raw_text = (text or "")
# Capture the Nevo מיני-רציו (editorial holdings summary) BEFORE stripping
# it out — it is a free professional gold-set for benchmarking halacha
# extraction (#86.3). Stored on the case_law row below once we have its id.
nevo_ratio = extractor.extract_nevo_ratio(raw_text)
raw_text = extractor.strip_nevo_preamble(raw_text).strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
# Persist the captured mini-ratio (best-effort; never block ingest on it).
if nevo_ratio:
try:
await db.update_case_law(case_law_id, nevo_ratio=nevo_ratio)
except Exception as e: # noqa: BLE001 — additive metadata, non-fatal
logger.warning("could not store nevo_ratio for %s: %s", case_law_id, e)
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
await db.mark_indexed(case_law_id)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await db.recompute_searchable(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)
async def reindex_case_law(
case_law_id: "UUID | str",
progress: ProgressCb | None = None,
) -> dict:
"""Re-chunk + re-embed an existing case_law row from its STORED full_text (GAP-09).
No re-extract / no re-OCR (uses the stored text — see feedback_no_reocr_retrofit)
and no LLM/CLI (only chunker + voyage embeddings), so it is safe to run anywhere.
Idempotent: store_precedent_chunks(_hierarchical) is DELETE-then-INSERT.
"""
progress = progress or _noop_progress
cid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
row = await db.get_case_law(cid)
if not row:
raise ValueError(f"case_law not found: {cid}")
text = (row.get("full_text") or "").strip()
if not text:
raise ValueError("case_law has no stored full_text to re-index")
stored = await _chunk_embed_store(cid, text, None, 0, progress)
await db.mark_indexed(cid)
await progress("completed", 100, f"הוטמע מחדש: {stored} chunks")
return {"status": "completed", "case_law_id": str(cid), "chunks": stored, "reindexed": True}