refactor(ingest): ingest_internal_decision delegates to canonical pipeline; queue metadata too (GAP-02, FU-1)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 19:19:10 +00:00
parent d7eb1b2824
commit 5104db8f4e

View File

@@ -16,21 +16,19 @@ Judicial decisions (Supreme Court, Administrative Court) stay in external_upload
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
import shutil
from datetime import date
from pathlib import Path from pathlib import Path
from uuid import UUID, uuid4 from uuid import UUID
from legal_mcp import config from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor from legal_mcp.services import db, embeddings, ingest
from legal_mcp.services.practice_area import derive_proceeding_type from legal_mcp.services.practice_area import derive_proceeding_type
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"} _VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
_COURT_TO_DISTRICT = [ _COURT_TO_DISTRICT = [
("ירושלים", "ירושלים"), ("ירושלים", "ירושלים"),
@@ -45,24 +43,6 @@ _COURT_TO_DISTRICT = [
] ]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}"
def _district_from_court(court: str) -> str: def _district_from_court(court: str) -> str:
for keyword, district in _COURT_TO_DISTRICT: for keyword, district in _COURT_TO_DISTRICT:
if keyword in court: if keyword in court:
@@ -70,6 +50,51 @@ def _district_from_court(court: str) -> str:
return "" return ""
def _internal_validate(inputs: dict) -> None:
if not (inputs.get("case_number") or "").strip():
raise ValueError("case_number is required")
def _internal_derive(inputs: dict) -> dict:
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
)
return {"district": district, "proceeding_type": proc}
async def _create_internal_record(**kw) -> dict:
return await db.create_internal_committee_decision(
case_number=kw["case_number"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
chair_name=(kw.get("chair_name") or "").strip(),
district=kw.get("district", ""),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
proceeding_type=kw.get("proceeding_type") or "ערר",
)
_INTERNAL_SPEC = ingest.IntakeSpec(
source_kind="internal_committee",
id_field="case_number",
staging_root=INTERNAL_DECISIONS_DIR,
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
validate=_internal_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
derive=_internal_derive,
display_name_fallback="case_number",
create_record=_create_internal_record,
)
async def ingest_internal_decision( async def ingest_internal_decision(
*, *,
case_number: str, case_number: str,
@@ -86,138 +111,23 @@ async def ingest_internal_decision(
file_path: str | Path | None = None, file_path: str | Path | None = None,
text: str | None = None, text: str | None = None,
document_id: UUID | None = None, document_id: UUID | None = None,
queue_halachot: bool = True, queue_halachot: bool = True, # retained for signature compat; pipeline always queues
proceeding_type: str = "", proceeding_type: str = "",
) -> dict: ) -> dict:
"""Ingest an appeals-committee decision into the internal corpus. """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
inputs = {
Either file_path or text must be provided. "case_number": case_number, "case_name": case_name, "court": court,
If district is empty, it is inferred from court. "decision_date": decision_date, "chair_name": chair_name, "district": district,
If proceeding_type is empty, it is derived from appeal_subtype/case_name. "practice_area": practice_area, "appeal_subtype": appeal_subtype,
Returns: {"status": "completed", "case_law_id": "...", "chunks": N} "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
""" "proceeding_type": proceeding_type,
if not file_path and not text: }
raise ValueError("either file_path or text is required") out = await ingest.ingest_document(
if not case_number.strip(): _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
raise ValueError("case_number is required")
resolved_district = district.strip() or _district_from_court(court)
resolved_proc = proceeding_type.strip() or derive_proceeding_type(
appeal_subtype=appeal_subtype, subject=case_name,
)
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}"
shutil.copy2(src, staged)
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
raw_text = extractor.strip_nevo_preamble(raw_text or "").strip()
if not raw_text:
raise ValueError("no extractable text in file")
else:
raw_text = (text or "").strip()
if not raw_text:
raise ValueError("text is empty")
page_count = 0
page_offsets = None
record = await db.create_internal_committee_decision(
case_number=case_number.strip(),
case_name=(case_name.strip() or case_number.strip()),
full_text=raw_text,
court=court.strip(),
decision_date=_coerce_date(decision_date),
chair_name=chair_name.strip(),
district=resolved_district,
practice_area=practice_area,
appeal_subtype=appeal_subtype.strip(),
subject_tags=list(subject_tags or []),
summary=summary.strip(),
is_binding=is_binding,
document_id=document_id, document_id=document_id,
proceeding_type=resolved_proc,
) )
case_law_id = UUID(str(record["id"])) return {"status": out["status"], "case_law_id": out["case_law_id"],
"chunks": out["chunks"], "halachot_pending": True}
try:
# Parent-doc retrieval (TaskMaster #48) — same gated branch as
# ingest_precedent. Internal committee decisions are typically
# longer than external court rulings (full transcript + ruling),
# so the parent-doc benefit is even larger here.
if config.PARENT_DOC_RETRIEVAL_ENABLED:
h_chunks = chunker.chunk_document_hierarchical(
raw_text, page_offsets=page_offsets,
)
if not h_chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
children = [c for c in h_chunks if c.role == "child"]
parents = [c for c in h_chunks if c.role == "parent"]
child_vectors = await embeddings.embed_texts(
[c.content for c in children], input_type="document",
)
chunk_dicts: list[dict] = []
for p in parents:
chunk_dicts.append({
"role": "parent", "local_id": p.local_id, "parent_local_id": None,
"chunk_index": p.chunk_index, "content": p.content,
"section_type": p.section_type, "page_number": p.page_number,
"embedding": None,
})
for c, v in zip(children, child_vectors):
chunk_dicts.append({
"role": "child", "local_id": c.local_id,
"parent_local_id": c.parent_local_id,
"chunk_index": c.chunk_index, "content": c.content,
"section_type": c.section_type, "page_number": c.page_number,
"embedding": v,
})
counts = await db.store_precedent_chunks_hierarchical(
case_law_id, chunk_dicts,
)
stored = counts["children"]
else:
chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets)
if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
chunk_texts = [c.content for c in chunks]
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
chunk_dicts = [
{
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
}
for c, v in zip(chunks, chunk_vectors)
]
stored = await db.store_precedent_chunks(case_law_id, chunk_dicts)
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
if queue_halachot:
await db.request_halacha_extraction(case_law_id)
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored,
"halachot_pending": True,
}
except Exception:
logger.exception("ingest_internal_decision failed for %s", case_number)
await db.set_case_law_extraction_status(case_law_id, "failed")
raise
async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: