"""Orchestrator for the Internal Committee Decisions corpus. Ingest pipeline: text/file → INSERT case_law (source_kind='internal_committee') → chunk → embed → store precedent_chunks → queue halacha extraction Migration helpers: migrate_from_style_corpus() — re-index style_corpus entries as searchable migrate_from_external_corpus() — reclassify external appeals-committee rows All ועדות ערר (any district) belong here. Judicial decisions (Supreme Court, Administrative Court) stay in external_upload. """ from __future__ import annotations import logging from pathlib import Path from uuid import UUID from legal_mcp import config from legal_mcp.services import db, embeddings, ingest from legal_mcp.services.practice_area import derive_proceeding_type logger = logging.getLogger(__name__) INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" _VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"}) _VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}) _COURT_TO_DISTRICT = [ ("ירושלים", "ירושלים"), ("תל אביב", "תל אביב"), ('ת"א', "תל אביב"), ("מרכז", "מרכז"), ("חיפה", "צפון"), ("צפון", "צפון"), ("דרום", "דרום"), ("ארצי", "ארצי"), ("ארצית", "ארצי"), ] def _district_from_court(court: str) -> str: for keyword, district in _COURT_TO_DISTRICT: if keyword in court: return district return "" def _internal_validate(inputs: dict) -> None: if not (inputs.get("case_number") or "").strip(): raise ValueError("case_number is required") def _internal_derive(inputs: dict) -> dict: district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "") proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type( appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "", ) return {"district": district, "proceeding_type": proc} async def _create_internal_record(**kw) -> dict: return await db.create_internal_committee_decision( case_number=kw["case_number"].strip(), case_name=kw["case_name"], full_text=kw["full_text"], court=(kw.get("court") or "").strip(), decision_date=kw.get("decision_date"), chair_name=(kw.get("chair_name") or "").strip(), district=kw.get("district", ""), practice_area=kw.get("practice_area", ""), appeal_subtype=(kw.get("appeal_subtype") or "").strip(), subject_tags=list(kw.get("subject_tags") or []), summary=(kw.get("summary") or "").strip(), is_binding=kw.get("is_binding", True), document_id=kw.get("document_id"), proceeding_type=kw.get("proceeding_type") or "ערר", ) _INTERNAL_SPEC = ingest.IntakeSpec( source_kind="internal_committee", id_field="case_number", staging_root=INTERNAL_DECISIONS_DIR, staging_subdir=lambda inputs: (inputs.get("district") or "other"), validate=_internal_validate, enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS}, derive=_internal_derive, display_name_fallback="case_number", create_record=_create_internal_record, ) async def ingest_internal_decision( *, case_number: str, case_name: str = "", court: str = "", decision_date=None, chair_name: str = "", district: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, summary: str = "", is_binding: bool = True, file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, proceeding_type: str = "", ) -> dict: """Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline.""" inputs = { "case_number": case_number, "case_name": case_name, "court": court, "decision_date": decision_date, "chair_name": chair_name, "district": district, "practice_area": practice_area, "appeal_subtype": appeal_subtype, "subject_tags": subject_tags, "summary": summary, "is_binding": is_binding, "proceeding_type": proceeding_type, } out = await ingest.ingest_document( _INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text, document_id=document_id, ) return {"status": out["status"], "case_law_id": out["case_law_id"], "chunks": out["chunks"], "halachot_pending": True} async def migrate_from_style_corpus(dry_run: bool = False) -> dict: """Re-index all style_corpus entries as searchable internal committee decisions. Does NOT delete style_corpus rows — they remain for style analysis. Skips entries that already exist in case_law as internal_committee. """ pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT decision_number, decision_date, full_text, practice_area, appeal_subtype, subject_categories FROM style_corpus ORDER BY decision_date NULLS LAST""" ) results = {"total": len(rows), "ingested": 0, "skipped": 0, "failed": 0, "dry_run": dry_run} for row in rows: case_number = (row["decision_number"] or "").strip() if not case_number: results["skipped"] += 1 continue if not dry_run: existing = await pool.fetchval( "SELECT id FROM case_law WHERE case_number = $1 AND source_kind = 'internal_committee'", case_number, ) if existing: results["skipped"] += 1 continue if dry_run: results["ingested"] += 1 continue try: subject_tags = list(row["subject_categories"] or []) raw_pa = row["practice_area"] or "" subtype = row["appeal_subtype"] or "" # style_corpus stores 'appeals_committee' (source_type) instead of practice_area _subtype_to_pa = { "building_permit": "rishuy_uvniya", "betterment_levy": "betterment_levy", "compensation_197": "compensation_197", } practice_area = raw_pa if raw_pa in ("rishuy_uvniya", "betterment_levy", "compensation_197") \ else _subtype_to_pa.get(subtype, "") await ingest_internal_decision( case_number=case_number, court="ועדת הערר לתכנון ובנייה — מחוז ירושלים", decision_date=row["decision_date"], chair_name="דפנה תמיר", district="ירושלים", practice_area=practice_area, appeal_subtype=subtype, subject_tags=subject_tags, text=row["full_text"], ) results["ingested"] += 1 logger.info("Migrated style_corpus entry: %s", case_number) except Exception as e: logger.error("Failed to migrate %s: %s", case_number, e) results["failed"] += 1 return results async def migrate_from_external_corpus(dry_run: bool = False) -> dict: """Reclassify external appeals-committee decisions to source_kind='internal_committee'. Identifies rows by source_type='appeals_committee' and updates source_kind + district. Existing precedent_chunks remain — no re-embedding needed. """ pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT id, case_number, court FROM case_law WHERE source_kind = 'external_upload' AND source_type = 'appeals_committee'""" ) results = {"total": len(rows), "updated": 0, "dry_run": dry_run} if dry_run: results["updated"] = len(rows) results["preview"] = [ {"case_number": r["case_number"], "court": r["court"], "district": _district_from_court(r["court"] or "")} for r in rows ] return results async with pool.acquire() as conn: for row in rows: district = _district_from_court(row["court"] or "") await conn.execute( """UPDATE case_law SET source_kind = 'internal_committee', district = CASE WHEN $2 <> '' THEN $2 ELSE district END WHERE id = $1""", row["id"], district, ) results["updated"] = len(rows) logger.info("Migrated %d external appeals-committee rows to internal_committee", len(rows)) return results async def enrich_migrated_entries(dry_run: bool = False) -> dict: """One-time enrichment: run metadata extraction + halacha extraction on all internal_committee entries that are waiting (halacha_status='pending', metadata never requested). Metadata extraction will: - Fix case_number from the decision header text - Fill case_name from the parties line - Fill date if missing Halacha extraction queues the LLM-based halacha extraction job. """ from legal_mcp.services import precedent_metadata_extractor, db as _db pool = await _db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT id, case_number FROM case_law WHERE source_kind = 'internal_committee' AND halacha_extraction_status = 'pending' AND metadata_extraction_requested_at IS NULL ORDER BY created_at""" ) results = { "total": len(rows), "metadata_updated": 0, "halachot_queued": 0, "failed": 0, "dry_run": dry_run, } if dry_run: return results for row in rows: case_law_id = row["id"] try: meta = await precedent_metadata_extractor.extract_and_apply( case_law_id, overwrite_case_number=True ) if meta.get("status") in ("completed", "no_changes"): results["metadata_updated"] += 1 logger.info( "enrich_migrated: %s → fields=%s", row["case_number"], meta.get("fields"), ) except Exception as e: logger.error("enrich_migrated metadata failed for %s: %s", row["case_number"], e) results["failed"] += 1 continue try: await _db.request_halacha_extraction(case_law_id) results["halachot_queued"] += 1 except Exception as e: logger.error("enrich_migrated halacha queue failed for %s: %s", row["case_number"], e) return results async def search_internal( query: str, *, practice_area: str = "", appeal_subtype: str = "", district: str = "", chair_name: str = "", limit: int = 10, include_halachot: bool = True, ) -> list[dict]: """Semantic search over internal committee decisions.""" from legal_mcp.services import hybrid_search if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await hybrid_search.search_precedent_library_hybrid( query=query, query_text_embedding=query_vec, limit=limit, practice_area=practice_area, appeal_subtype=appeal_subtype, include_halachot=include_halachot, source_kind="internal_committee", district=district, chair_name=chair_name, )