diff --git a/mcp-server/src/legal_mcp/services/internal_decisions.py b/mcp-server/src/legal_mcp/services/internal_decisions.py index 5c051a2..5a0dd4e 100644 --- a/mcp-server/src/legal_mcp/services/internal_decisions.py +++ b/mcp-server/src/legal_mcp/services/internal_decisions.py @@ -16,21 +16,19 @@ Judicial decisions (Supreme Court, Administrative Court) stay in external_upload from __future__ import annotations import logging -import re -import shutil -from datetime import date from pathlib import Path -from uuid import UUID, uuid4 +from uuid import UUID from legal_mcp import config -from legal_mcp.services import chunker, db, embeddings, extractor +from legal_mcp.services import db, embeddings, ingest from legal_mcp.services.practice_area import derive_proceeding_type logger = logging.getLogger(__name__) INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" -_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"} +_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) +_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) _COURT_TO_DISTRICT = [ ("ירושלים", "ירושלים"), @@ -45,24 +43,6 @@ _COURT_TO_DISTRICT = [ ] -def _coerce_date(value) -> date | None: - if value is None or value == "": - return None - if isinstance(value, date): - return value - if isinstance(value, str): - try: - return date.fromisoformat(value[:10]) - except ValueError: - return None - return None - - -def _safe_filename(name: str) -> str: - base = Path(name).name - return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}" - - def _district_from_court(court: str) -> str: for keyword, district in _COURT_TO_DISTRICT: if keyword in court: @@ -70,6 +50,51 @@ def _district_from_court(court: str) -> str: return "" +def _internal_validate(inputs: dict) -> None: + if not (inputs.get("case_number") or "").strip(): + raise ValueError("case_number is required") + + +def _internal_derive(inputs: dict) -> dict: + district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") + proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( + appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", + ) + return {"district": district, "proceeding_type": proc} + + +async def _create_internal_record(**kw) -> dict: + return await db.create_internal_committee_decision( + case_number=kw["case_number"].strip(), + case_name=kw["case_name"], + full_text=kw["full_text"], + court=(kw.get("court") or "").strip(), + decision_date=kw.get("decision_date"), + chair_name=(kw.get("chair_name") or "").strip(), + district=kw.get("district", ""), + practice_area=kw.get("practice_area", ""), + appeal_subtype=(kw.get("appeal_subtype") or "").strip(), + subject_tags=list(kw.get("subject_tags") or []), + summary=(kw.get("summary") or "").strip(), + is_binding=kw.get("is_binding", True), + document_id=kw.get("document_id"), + proceeding_type=kw.get("proceeding_type") or "ערר", + ) + + +_INTERNAL_SPEC = ingest.IntakeSpec( + source_kind="internal_committee", + id_field="case_number", + staging_root=INTERNAL_DECISIONS_DIR, + staging_subdir=lambda inputs: (inputs.get("district") or "other"), + validate=_internal_validate, + enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, + derive=_internal_derive, + display_name_fallback="case_number", + create_record=_create_internal_record, +) + + async def ingest_internal_decision( *, case_number: str, @@ -86,138 +111,23 @@ async def ingest_internal_decision( file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, - queue_halachot: bool = True, + queue_halachot: bool = True, # retained for signature compat; pipeline always queues proceeding_type: str = "", ) -> dict: - """Ingest an appeals-committee decision into the internal corpus. - - Either file_path or text must be provided. - If district is empty, it is inferred from court. - If proceeding_type is empty, it is derived from appeal_subtype/case_name. - Returns: {"status": "completed", "case_law_id": "...", "chunks": N} - """ - if not file_path and not text: - raise ValueError("either file_path or text is required") - if not case_number.strip(): - raise ValueError("case_number is required") - - resolved_district = district.strip() or _district_from_court(court) - resolved_proc = proceeding_type.strip() or derive_proceeding_type( - appeal_subtype=appeal_subtype, subject=case_name, - ) - - if file_path: - src = Path(file_path) - if not src.is_file(): - raise FileNotFoundError(f"file not found: {src}") - dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other") - dest_dir.mkdir(parents=True, exist_ok=True) - staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}" - shutil.copy2(src, staged) - raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) - raw_text = extractor.strip_nevo_preamble(raw_text or "").strip() - if not raw_text: - raise ValueError("no extractable text in file") - else: - raw_text = (text or "").strip() - if not raw_text: - raise ValueError("text is empty") - page_count = 0 - page_offsets = None - - record = await db.create_internal_committee_decision( - case_number=case_number.strip(), - case_name=(case_name.strip() or case_number.strip()), - full_text=raw_text, - court=court.strip(), - decision_date=_coerce_date(decision_date), - chair_name=chair_name.strip(), - district=resolved_district, - practice_area=practice_area, - appeal_subtype=appeal_subtype.strip(), - subject_tags=list(subject_tags or []), - summary=summary.strip(), - is_binding=is_binding, + """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" + inputs = { + "case_number": case_number, "case_name": case_name, "court": court, + "decision_date": decision_date, "chair_name": chair_name, "district": district, + "practice_area": practice_area, "appeal_subtype": appeal_subtype, + "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, + "proceeding_type": proceeding_type, + } + out = await ingest.ingest_document( + _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, document_id=document_id, - proceeding_type=resolved_proc, ) - case_law_id = UUID(str(record["id"])) - - try: - # Parent-doc retrieval (TaskMaster #48) — same gated branch as - # ingest_precedent. Internal committee decisions are typically - # longer than external court rulings (full transcript + ruling), - # so the parent-doc benefit is even larger here. - if config.PARENT_DOC_RETRIEVAL_ENABLED: - h_chunks = chunker.chunk_document_hierarchical( - raw_text, page_offsets=page_offsets, - ) - if not h_chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} - children = [c for c in h_chunks if c.role == "child"] - parents = [c for c in h_chunks if c.role == "parent"] - child_vectors = await embeddings.embed_texts( - [c.content for c in children], input_type="document", - ) - chunk_dicts: list[dict] = [] - for p in parents: - chunk_dicts.append({ - "role": "parent", "local_id": p.local_id, "parent_local_id": None, - "chunk_index": p.chunk_index, "content": p.content, - "section_type": p.section_type, "page_number": p.page_number, - "embedding": None, - }) - for c, v in zip(children, child_vectors): - chunk_dicts.append({ - "role": "child", "local_id": c.local_id, - "parent_local_id": c.parent_local_id, - "chunk_index": c.chunk_index, "content": c.content, - "section_type": c.section_type, "page_number": c.page_number, - "embedding": v, - }) - counts = await db.store_precedent_chunks_hierarchical( - case_law_id, chunk_dicts, - ) - stored = counts["children"] - else: - chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} - - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored = await db.store_precedent_chunks(case_law_id, chunk_dicts) - - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "pending") - if queue_halachot: - await db.request_halacha_extraction(case_law_id) - - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": stored, - "halachot_pending": True, - } - - except Exception: - logger.exception("ingest_internal_decision failed for %s", case_number) - await db.set_case_law_extraction_status(case_law_id, "failed") - raise + return {"status": out["status"], "case_law_id": out["case_law_id"], + "chunks": out["chunks"], "halachot_pending": True} async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: