"""Orchestrator for the Internal Committee Decisions corpus. Ingest pipeline: text/file → INSERT case_law (source_kind='internal_committee') → chunk → embed → store precedent_chunks → queue halacha extraction Migration helpers: migrate_from_style_corpus() — re-index style_corpus entries as searchable migrate_from_external_corpus() — reclassify external appeals-committee rows All ועדות ערר (any district) belong here. Judicial decisions (Supreme Court, Administrative Court) stay in external_upload. """ from __future__ import annotations import logging import re import shutil from datetime import date from pathlib import Path from uuid import UUID, uuid4 from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor logger = logging.getLogger(__name__) INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions" _VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"} _COURT_TO_DISTRICT = [ ("ירושלים", "ירושלים"), ("תל אביב", "תל אביב"), ('ת"א', "תל אביב"), ("מרכז", "מרכז"), ("חיפה", "צפון"), ("צפון", "צפון"), ("דרום", "דרום"), ("ארצי", "ארצי"), ("ארצית", "ארצי"), ] def _coerce_date(value) -> date | None: if value is None or value == "": return None if isinstance(value, date): return value if isinstance(value, str): try: return date.fromisoformat(value[:10]) except ValueError: return None return None def _safe_filename(name: str) -> str: base = Path(name).name return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}" def _district_from_court(court: str) -> str: for keyword, district in _COURT_TO_DISTRICT: if keyword in court: return district return "" async def ingest_internal_decision( *, case_number: str, case_name: str = "", court: str = "", decision_date=None, chair_name: str = "", district: str = "", practice_area: str = "", appeal_subtype: str = "", subject_tags: list[str] | None = None, summary: str = "", is_binding: bool = True, file_path: str | Path | None = None, text: str | None = None, document_id: UUID | None = None, queue_halachot: bool = True, ) -> dict: """Ingest an appeals-committee decision into the internal corpus. Either file_path or text must be provided. If district is empty, it is inferred from court. Returns: {"status": "completed", "case_law_id": "...", "chunks": N} """ if not file_path and not text: raise ValueError("either file_path or text is required") if not case_number.strip(): raise ValueError("case_number is required") resolved_district = district.strip() or _district_from_court(court) if file_path: src = Path(file_path) if not src.is_file(): raise FileNotFoundError(f"file not found: {src}") dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other") dest_dir.mkdir(parents=True, exist_ok=True) staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}" shutil.copy2(src, staged) raw_text, page_count, page_offsets = await extractor.extract_text(str(staged)) raw_text = extractor.strip_nevo_preamble(raw_text or "").strip() if not raw_text: raise ValueError("no extractable text in file") else: raw_text = (text or "").strip() if not raw_text: raise ValueError("text is empty") page_count = 0 page_offsets = None record = await db.create_internal_committee_decision( case_number=case_number.strip(), case_name=(case_name.strip() or case_number.strip()), full_text=raw_text, court=court.strip(), decision_date=_coerce_date(decision_date), chair_name=chair_name.strip(), district=resolved_district, practice_area=practice_area, appeal_subtype=appeal_subtype.strip(), subject_tags=list(subject_tags or []), summary=summary.strip(), is_binding=is_binding, document_id=document_id, ) case_law_id = UUID(str(record["id"])) try: chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets) if not chunks: await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "completed") return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} chunk_texts = [c.content for c in chunks] chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") chunk_dicts = [ { "chunk_index": c.chunk_index, "content": c.content, "section_type": c.section_type, "page_number": c.page_number, "embedding": v, } for c, v in zip(chunks, chunk_vectors) ] stored = await db.store_precedent_chunks(case_law_id, chunk_dicts) await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "pending") if queue_halachot: await db.request_halacha_extraction(case_law_id) return { "status": "completed", "case_law_id": str(case_law_id), "chunks": stored, "halachot_pending": True, } except Exception: logger.exception("ingest_internal_decision failed for %s", case_number) await db.set_case_law_extraction_status(case_law_id, "failed") raise async def migrate_from_style_corpus(dry_run: bool = False, queue_halachot: bool = True) -> dict: """Re-index all style_corpus entries as searchable internal committee decisions. Does NOT delete style_corpus rows — they remain for style analysis. Skips entries that already exist in case_law as internal_committee. """ pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT decision_number, decision_date, full_text, practice_area, appeal_subtype, subject_categories FROM style_corpus ORDER BY decision_date NULLS LAST""" ) results = {"total": len(rows), "ingested": 0, "skipped": 0, "failed": 0, "dry_run": dry_run} for row in rows: case_number = (row["decision_number"] or "").strip() if not case_number: results["skipped"] += 1 continue if not dry_run: existing = await pool.fetchval( "SELECT id FROM case_law WHERE case_number = $1 AND source_kind = 'internal_committee'", case_number, ) if existing: results["skipped"] += 1 continue if dry_run: results["ingested"] += 1 continue try: subject_tags = list(row["subject_categories"] or []) raw_pa = row["practice_area"] or "" subtype = row["appeal_subtype"] or "" # style_corpus stores 'appeals_committee' (source_type) instead of practice_area _subtype_to_pa = { "building_permit": "rishuy_uvniya", "betterment_levy": "betterment_levy", "compensation_197": "compensation_197", } practice_area = raw_pa if raw_pa in ("rishuy_uvniya", "betterment_levy", "compensation_197") \ else _subtype_to_pa.get(subtype, "") await ingest_internal_decision( case_number=case_number, court="ועדת הערר לתכנון ובנייה — מחוז ירושלים", decision_date=row["decision_date"], chair_name="דפנה תמיר", district="ירושלים", practice_area=practice_area, appeal_subtype=subtype, subject_tags=subject_tags, text=row["full_text"], queue_halachot=queue_halachot, ) results["ingested"] += 1 logger.info("Migrated style_corpus entry: %s", case_number) except Exception as e: logger.error("Failed to migrate %s: %s", case_number, e) results["failed"] += 1 return results async def migrate_from_external_corpus(dry_run: bool = False) -> dict: """Reclassify external appeals-committee decisions to source_kind='internal_committee'. Identifies rows by source_type='appeals_committee' and updates source_kind + district. Existing precedent_chunks remain — no re-embedding needed. """ pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT id, case_number, court FROM case_law WHERE source_kind = 'external_upload' AND source_type = 'appeals_committee'""" ) results = {"total": len(rows), "updated": 0, "dry_run": dry_run} if dry_run: results["updated"] = len(rows) results["preview"] = [ {"case_number": r["case_number"], "court": r["court"], "district": _district_from_court(r["court"] or "")} for r in rows ] return results async with pool.acquire() as conn: for row in rows: district = _district_from_court(row["court"] or "") await conn.execute( """UPDATE case_law SET source_kind = 'internal_committee', district = CASE WHEN $2 <> '' THEN $2 ELSE district END WHERE id = $1""", row["id"], district, ) results["updated"] = len(rows) logger.info("Migrated %d external appeals-committee rows to internal_committee", len(rows)) return results async def enrich_migrated_entries(dry_run: bool = False) -> dict: """One-time enrichment: run metadata extraction + halacha extraction on all internal_committee entries that are waiting (halacha_status='pending', metadata never requested). Metadata extraction will: - Fix case_number from the decision header text - Fill case_name from the parties line - Fill date if missing Halacha extraction queues the LLM-based halacha extraction job. """ from legal_mcp.services import precedent_metadata_extractor, db as _db pool = await _db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT id, case_number FROM case_law WHERE source_kind = 'internal_committee' AND halacha_extraction_status = 'pending' AND metadata_extraction_requested_at IS NULL ORDER BY created_at""" ) results = { "total": len(rows), "metadata_updated": 0, "halachot_queued": 0, "failed": 0, "dry_run": dry_run, } if dry_run: return results for row in rows: case_law_id = row["id"] try: meta = await precedent_metadata_extractor.extract_and_apply( case_law_id, overwrite_case_number=True ) if meta.get("status") in ("completed", "no_changes"): results["metadata_updated"] += 1 logger.info( "enrich_migrated: %s → fields=%s", row["case_number"], meta.get("fields"), ) except Exception as e: logger.error("enrich_migrated metadata failed for %s: %s", row["case_number"], e) results["failed"] += 1 continue try: await _db.request_halacha_extraction(case_law_id) results["halachot_queued"] += 1 except Exception as e: logger.error("enrich_migrated halacha queue failed for %s: %s", row["case_number"], e) return results async def search_internal( query: str, *, practice_area: str = "", appeal_subtype: str = "", district: str = "", chair_name: str = "", limit: int = 10, include_halachot: bool = True, ) -> list[dict]: """Semantic search over internal committee decisions.""" from legal_mcp.services import hybrid_search if not query.strip(): return [] query_vec = await embeddings.embed_query(query) return await hybrid_search.search_precedent_library_hybrid( query=query, query_text_embedding=query_vec, limit=limit, practice_area=practice_area, appeal_subtype=appeal_subtype, include_halachot=include_halachot, source_kind="internal_committee", district=district, chair_name=chair_name, )