From be4f7bbe9962f6b5e0448e7289520790168ddba9 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 30 May 2026 19:13:15 +0000 Subject: [PATCH] feat(ingest): canonical ingest_document pipeline (FU-1) --- mcp-server/src/legal_mcp/services/ingest.py | 142 ++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/mcp-server/src/legal_mcp/services/ingest.py b/mcp-server/src/legal_mcp/services/ingest.py index 64c7b82..45a9cc6 100644 --- a/mcp-server/src/legal_mcp/services/ingest.py +++ b/mcp-server/src/legal_mcp/services/ingest.py @@ -113,3 +113,145 @@ async def _embed_pages(case_law_id: UUID, pdf_path: Path, page_count: int) -> di def spec_thumb_dir(case_law_id: UUID) -> Path: """Thumbnails live under the precedent-library tree regardless of intake type.""" return Path(config.DATA_DIR) / "precedent-library" / "thumbnails" / str(case_law_id) + + +async def ingest_document( + spec: IntakeSpec, + *, + inputs: dict, + file_path: str | Path | None = None, + text: str | None = None, + document_id: UUID | None = None, + progress: ProgressCb | None = None, +) -> dict: + """Run the canonical 12-step pipeline for one intake item. + + ``inputs`` carries the type-specific record fields (citation/case_number, + case_name, court, practice_area, etc.). ``spec`` decides how they are + validated, staged, derived, and which DB-create runs. Returns a dict with + at least: status, case_law_id, chunks. + """ + progress = progress or _noop_progress + + # Step 1: input validation (type-specific) + enums (uniform mechanism). + if not file_path and text is None: + raise ValueError("either file_path or text is required") + spec.validate(inputs) + _validate_enums(spec, inputs) + + # Step 2: field derivation (identity for external). + inputs = {**inputs, **spec.derive(inputs)} + + # Steps 3-5: stage (if file) + extract + strip. + page_count = 0 + page_offsets = None + staged: Path | None = None + if file_path: + src = Path(file_path) + if not src.is_file(): + raise FileNotFoundError(f"file not found: {src}") + await progress("staging", 5, "מעתיק את הקובץ לאחסון") + staged = _stage_file(src, spec.staging_root, spec.staging_subdir(inputs)) + await progress("extracting", 15, "מחלץ טקסט מהקובץ") + try: + raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) + except Exception as e: + await progress("failed", 100, f"כשל בחילוץ טקסט: {e}") + raise + raw_text = extractor.strip_nevo_preamble((raw_text or "")).strip() + else: + raw_text = (text or "").strip() + if not raw_text: + await progress("failed", 100, "לא נמצא טקסט בקובץ") + raise ValueError("no extractable text in file") + + # Step 6: DB create (type-specific, routed — get case_law_id). + await progress("storing_metadata", 25, "שומר את הרשומה במסד הנתונים") + display_name = (inputs.get("case_name") or "").strip() or ( + inputs.get(spec.display_name_fallback) or "" + ).strip() + record = await spec.create_record( + full_text=raw_text, + case_name=display_name, + decision_date=_coerce_date(inputs.get("decision_date")), + document_id=document_id, + **{k: v for k, v in inputs.items() + if k not in {"case_name", "decision_date", "file_path", "text"}}, + ) + case_law_id = UUID(str(record["id"])) + + try: + stored_chunks = await _chunk_embed_store(case_law_id, raw_text, page_offsets, page_count, progress) + + # Step 9: multimodal — uniform: flag + PDF + page_count, NOT intake type. + if (config.MULTIMODAL_ENABLED and page_count > 0 + and staged is not None and staged.suffix.lower() == ".pdf"): + try: + await progress("embedding_images", 70, f"מטמיע {page_count} עמודי תמונה (multimodal)") + await _embed_pages(case_law_id, staged, page_count) + except Exception as e: + logger.warning("Multimodal embedding failed (non-fatal): %s", e) + + # Steps 10-12: queue BOTH extractions (GAP-02 fix) + statuses. + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "pending") + await db.request_metadata_extraction(case_law_id) + await db.request_halacha_extraction(case_law_id) + + await progress("completed", 100, + f"נקלט: {stored_chunks} chunks. חילוץ הלכות ומטא-דאטה ממתינים בתור.") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": stored_chunks, + "halachot": 0, + "halachot_pending": True, + "metadata_filled": [], + "pages": page_count, + } + except Exception as e: + logger.exception("ingest_document failed (%s): %s", spec.source_kind, e) + await db.set_case_law_extraction_status(case_law_id, "failed") + await progress("failed", 100, f"כשל בעיבוד: {e}") + raise + + +async def _chunk_embed_store(case_law_id, text, page_offsets, page_count, progress) -> int: + """Steps 7-8: chunk (hierarchical/flat by flag) → embed children → store.""" + if config.PARENT_DOC_RETRIEVAL_ENABLED: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')") + h_chunks = chunker.chunk_document_hierarchical(text, page_offsets=page_offsets) + if not h_chunks: + return 0 + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + await progress("embedding", 55, f"מייצר embeddings ל-{len(children)} children ({len(parents)} parents)") + child_vectors = await embeddings.embed_texts([c.content for c in children], input_type="document") + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", "local_id": p.local_id, "parent_local_id": None, + "chunk_index": p.chunk_index, "content": p.content, + "section_type": p.section_type, "page_number": p.page_number, "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", "local_id": c.local_id, "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical(case_law_id, chunk_dicts) + return counts["children"] + else: + await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") + chunks = chunker.chunk_document(text, page_offsets=page_offsets) + if not chunks: + return 0 + await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") + chunk_vectors = await embeddings.embed_texts([c.content for c in chunks], input_type="document") + chunk_dicts = [ + {"chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, "embedding": v} + for c, v in zip(chunks, chunk_vectors) + ] + return await db.store_precedent_chunks(case_law_id, chunk_dicts)