feat(ingest): canonical ingest_document pipeline (FU-1)

This commit is contained in:
2026-05-30 19:13:15 +00:00
parent d4663eba8f
commit be4f7bbe99

View File

@@ -113,3 +113,145 @@ async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> di
def spec_thumb_dir(case_law_id: UUID) -> Path:
"""Thumbnails live under the precedent-library tree regardless of intake type."""
return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id)
async def ingest_document(
spec: IntakeSpec,
*,
inputs: dict,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
progress: ProgressCb | None = None,
) -> dict:
"""Run the canonical 12-step pipeline for one intake item.
``inputs`` carries the type-specific record fields (citation/case_number,
case_name, court, practice_area, etc.). ``spec`` decides how they are
validated, staged, derived, and which DB-create runs. Returns a dict with
at least: status, case_law_id, chunks.
"""
progress = progress or _noop_progress
# Step 1: input validation (type-specific) + enums (uniform mechanism).
if not file_path and text is None:
raise ValueError("either file_path or text is required")
spec.validate(inputs)
_validate_enums(spec, inputs)
# Step 2: field derivation (identity for external).
inputs = {**inputs, **spec.derive(inputs)}
# Steps 3-5: stage (if file) + extract + strip.
page_count = 0
page_offsets = None
staged: Path | None = None
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
await progress("staging", 5, "מעתיק את הקובץ לאחסון")
staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs))
await progress("extracting", 15, "מחלץ טקסט מהקובץ")
try:
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
except Exception as e:
await progress("failed", 100, f"כשל בחילוץ טקסט: {e}")
raise
raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip()
else:
raw_text = (text or "").strip()
if not raw_text:
await progress("failed", 100, "לא נמצא טקסט בקובץ")
raise ValueError("no extractable text in file")
# Step 6: DB create (type-specific, routed — get case_law_id).
await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים")
display_name = (inputs.get("case_name") or "").strip() or (
inputs.get(spec.display_name_fallback) or ""
).strip()
record = await spec.create_record(
full_text=raw_text,
case_name=display_name,
decision_date=_coerce_date(inputs.get("decision_date")),
document_id=document_id,
**{k: v for k, v in inputs.items()
if k not in {"case_name", "decision_date", "file_path", "text"}},
)
case_law_id = UUID(str(record["id"]))
try:
stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress)
# Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type.
if (config.MULTIMODAL_ENABLED and page_count > 0
and staged is not None and staged.suffix.lower() == ".pdf"):
try:
await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)")
await _embed_pages(case_law_id, staged, page_count)
except Exception as e:
logger.warning("Multimodal embedding failed (non-fatal): %s", e)
# Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses.
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_metadata_extraction(case_law_id)
await db.request_halacha_extraction(case_law_id)
await progress("completed", 100,
f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.")
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored_chunks,
"halachot": 0,
"halachot_pending": True,
"metadata_filled": [],
"pages": page_count,
}
except Exception as e:
logger.exception("ingest_document failed (%s): %s", spec.source_kind, e)
await db.set_case_law_extraction_status(case_law_id, "failed")
await progress("failed", 100, f"כשל בעיבוד: {e}")
raise
async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int:
"""Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store."""
if config.PARENT_DOC_RETRIEVAL_ENABLED:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')")
h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets)
if not h_chunks:
return 0
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)")
child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document")
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number, "embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts)
return counts["children"]
else:
await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')")
chunks = chunker.chunk_document(text, page_offsets=page_offsets)
if not chunks:
return 0
await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks")
chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document")
chunk_dicts = [
{"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number, "embedding": v}
for c, v in zip(chunks, chunk_vectors)
]
return await db.store_precedent_chunks(case_law_id, chunk_dicts)