Files
legal-ai/mcp-server/src/legal_mcp/services/internal_decisions.py
Chaim e8bcb9c1ea
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
fix(cases): מספור 5-ספרתי לבל"מ — סיווג, ולידציה, וחיפוש פסיקה-חסרה
נוהל-יו"ר (2026-06-11): מבנה מספר-תיק = <סידורי>-<חודש>-<שנה>, ואורך הסידורי
מקודד את סוג-ההליך — 4 ספרות = ערר, 5 ספרות = בל"מ. הספרה הראשונה ממשיכה
לקבוע תחום בשני האורכים (1→רישוי, 8→היטל, 9→פיצויים). הכלל חד-כיווני:
5-ספרתי הוא תמיד בל"מ; 4-ספרתי אינו מחייב ערר (בל"מ-מורשת מזוהה מהנושא).

הבאג שדיווח עליו היו"ר: חיפוש פסיקה-חסרה לפי מספר-תיק החזיר 404 על כל ערך
שאינו תיק קיים — שבר את הטבלה תוך כדי הקלדה ועל מספרי 5-ספרות.

תיקונים:
- web/app.py: GET /api/missing-precedents — מסנן case_number שלא תאם תיק מחזיר
  רשימה ריקה (200), לא 404. סמנטיקה תקינה ל-collection-filter.
- missing-precedents/page.tsx: debounce (350ms) על שדות-הסינון — קוורי אחד
  אחרי שמפסיקים להקליד, לא אחד לכל הקשה.
- practice_area.py: regex סידורי \d{4}→\d{4,5}; case_serial_digits() +
  is_blam_by_number() (5⇒בל"מ); derive_subtype_with_blam ו-derive_proceeding_type
  מזהים בל"מ גם מ-5-ספרות (בנוסף לנושא). callers: cases.py, internal_decisions.py.
- proofreader.py: דפוסי חילוץ-שם-קובץ \d{3,4}→\d{3,5}.
- web-ui: practice-area.ts (מראָה ל-backend), schemas/case.ts (regex
  serial-month-year, 4-or-5 ספרות, superRefine 5⇒בל"מ), placeholder בוויזרד.
- תיעוד: docs/spec/X1-identifiers.md §1א + legal-ai/CLAUDE.md.

Invariants: מקיים G1 (נרמול-במקור — ספרה ראשונה כמקור-אמת יחיד לתחום),
G2 (מסלול-סיווג יחיד, אין כפילות), INV-DM/X1 (מפתח קנוני + proceeding_type).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 06:16:42 +00:00

331 lines
12 KiB
Python

"""Orchestrator for the Internal Committee Decisions corpus.
Ingest pipeline:
text/file → INSERT case_law (source_kind='internal_committee')
→ chunk → embed → store precedent_chunks
→ queue halacha extraction
Migration helpers:
migrate_from_style_corpus() — re-index style_corpus entries as searchable
migrate_from_external_corpus() — reclassify external appeals-committee rows
All ועדות ערר (any district) belong here.
Judicial decisions (Supreme Court, Administrative Court) stay in external_upload.
"""
from __future__ import annotations
import logging
from pathlib import Path
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, embeddings, ingest
from legal_mcp.services.practice_area import derive_proceeding_type
logger = logging.getLogger(__name__)
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_PRACTICE_AREAS = frozenset({"", "rishuy_uvniya", "betterment_levy", "compensation_197"})
_VALID_DISTRICTS = frozenset({"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"})
_COURT_TO_DISTRICT = [
("ירושלים", "ירושלים"),
("תל אביב", "תל אביב"),
('ת"א', "תל אביב"),
("מרכז", "מרכז"),
("חיפה", "צפון"),
("צפון", "צפון"),
("דרום", "דרום"),
("ארצי", "ארצי"),
("ארצית", "ארצי"),
]
def _district_from_court(court: str) -> str:
for keyword, district in _COURT_TO_DISTRICT:
if keyword in court:
return district
return ""
def _internal_validate(inputs: dict) -> None:
if not (inputs.get("case_number") or "").strip():
raise ValueError("case_number is required")
def _internal_derive(inputs: dict) -> dict:
district = (inputs.get("district") or "").strip() or _district_from_court(inputs.get("court") or "")
proc = (inputs.get("proceeding_type") or "").strip() or derive_proceeding_type(
case_number=inputs.get("case_number") or "",
appeal_subtype=inputs.get("appeal_subtype") or "", subject=inputs.get("case_name") or "",
)
return {"district": district, "proceeding_type": proc}
async def _create_internal_record(**kw) -> dict:
return await db.create_internal_committee_decision(
case_number=kw["case_number"].strip(),
case_name=kw["case_name"],
full_text=kw["full_text"],
court=(kw.get("court") or "").strip(),
decision_date=kw.get("decision_date"),
chair_name=(kw.get("chair_name") or "").strip(),
district=kw.get("district", ""),
practice_area=kw.get("practice_area", ""),
appeal_subtype=(kw.get("appeal_subtype") or "").strip(),
subject_tags=list(kw.get("subject_tags") or []),
summary=(kw.get("summary") or "").strip(),
is_binding=kw.get("is_binding", True),
document_id=kw.get("document_id"),
proceeding_type=kw.get("proceeding_type") or "ערר",
)
_INTERNAL_SPEC = ingest.IntakeSpec(
source_kind="internal_committee",
id_field="case_number",
staging_root=INTERNAL_DECISIONS_DIR,
staging_subdir=lambda inputs: (inputs.get("district") or "other"),
validate=_internal_validate,
enum_fields={"practice_area": _VALID_PRACTICE_AREAS, "district": _VALID_DISTRICTS},
derive=_internal_derive,
display_name_fallback="case_number",
create_record=_create_internal_record,
)
async def ingest_internal_decision(
*,
case_number: str,
case_name: str = "",
court: str = "",
decision_date=None,
chair_name: str = "",
district: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
summary: str = "",
is_binding: bool = True,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
proceeding_type: str = "",
) -> dict:
"""Ingest one appeals-committee decision. Thin wrapper over the canonical pipeline."""
inputs = {
"case_number": case_number, "case_name": case_name, "court": court,
"decision_date": decision_date, "chair_name": chair_name, "district": district,
"practice_area": practice_area, "appeal_subtype": appeal_subtype,
"subject_tags": subject_tags, "summary": summary, "is_binding": is_binding,
"proceeding_type": proceeding_type,
}
out = await ingest.ingest_document(
_INTERNAL_SPEC, inputs=inputs, file_path=file_path, text=text,
document_id=document_id,
)
return {"status": out["status"], "case_law_id": out["case_law_id"],
"chunks": out["chunks"], "halachot_pending": True}
async def migrate_from_style_corpus(dry_run: bool = False) -> dict:
"""Re-index all style_corpus entries as searchable internal committee decisions.
Does NOT delete style_corpus rows — they remain for style analysis.
Skips entries that already exist in case_law as internal_committee.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT decision_number, decision_date, full_text,
practice_area, appeal_subtype, subject_categories
FROM style_corpus
ORDER BY decision_date NULLS LAST"""
)
results = {"total": len(rows), "ingested": 0, "skipped": 0, "failed": 0, "dry_run": dry_run}
for row in rows:
case_number = (row["decision_number"] or "").strip()
if not case_number:
results["skipped"] += 1
continue
if not dry_run:
existing = await pool.fetchval(
"SELECT id FROM case_law WHERE case_number = $1 AND source_kind = 'internal_committee'",
case_number,
)
if existing:
results["skipped"] += 1
continue
if dry_run:
results["ingested"] += 1
continue
try:
subject_tags = list(row["subject_categories"] or [])
raw_pa = row["practice_area"] or ""
subtype = row["appeal_subtype"] or ""
# style_corpus stores 'appeals_committee' (source_type) instead of practice_area
_subtype_to_pa = {
"building_permit": "rishuy_uvniya",
"betterment_levy": "betterment_levy",
"compensation_197": "compensation_197",
}
practice_area = raw_pa if raw_pa in ("rishuy_uvniya", "betterment_levy", "compensation_197") \
else _subtype_to_pa.get(subtype, "")
await ingest_internal_decision(
case_number=case_number,
court="ועדת הערר לתכנון ובנייה — מחוז ירושלים",
decision_date=row["decision_date"],
chair_name="דפנה תמיר",
district="ירושלים",
practice_area=practice_area,
appeal_subtype=subtype,
subject_tags=subject_tags,
text=row["full_text"],
)
results["ingested"] += 1
logger.info("Migrated style_corpus entry: %s", case_number)
except Exception as e:
logger.error("Failed to migrate %s: %s", case_number, e)
results["failed"] += 1
return results
async def migrate_from_external_corpus(dry_run: bool = False) -> dict:
"""Reclassify external appeals-committee decisions to source_kind='internal_committee'.
Identifies rows by source_type='appeals_committee' and updates source_kind + district.
Existing precedent_chunks remain — no re-embedding needed.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT id, case_number, court
FROM case_law
WHERE source_kind = 'external_upload'
AND source_type = 'appeals_committee'"""
)
results = {"total": len(rows), "updated": 0, "dry_run": dry_run}
if dry_run:
results["updated"] = len(rows)
results["preview"] = [
{"case_number": r["case_number"], "court": r["court"], "district": _district_from_court(r["court"] or "")}
for r in rows
]
return results
async with pool.acquire() as conn:
for row in rows:
district = _district_from_court(row["court"] or "")
await conn.execute(
"""UPDATE case_law
SET source_kind = 'internal_committee',
district = CASE WHEN $2 <> '' THEN $2 ELSE district END
WHERE id = $1""",
row["id"], district,
)
results["updated"] = len(rows)
logger.info("Migrated %d external appeals-committee rows to internal_committee", len(rows))
return results
async def enrich_migrated_entries(dry_run: bool = False) -> dict:
"""One-time enrichment: run metadata extraction + halacha extraction on all
internal_committee entries that are waiting (halacha_status='pending',
metadata never requested).
Metadata extraction will:
- Fix case_number from the decision header text
- Fill case_name from the parties line
- Fill date if missing
Halacha extraction queues the LLM-based halacha extraction job.
"""
from legal_mcp.services import precedent_metadata_extractor, db as _db
pool = await _db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT id, case_number
FROM case_law
WHERE source_kind = 'internal_committee'
AND halacha_extraction_status = 'pending'
AND metadata_extraction_requested_at IS NULL
ORDER BY created_at"""
)
results = {
"total": len(rows),
"metadata_updated": 0,
"halachot_queued": 0,
"failed": 0,
"dry_run": dry_run,
}
if dry_run:
return results
for row in rows:
case_law_id = row["id"]
try:
meta = await precedent_metadata_extractor.extract_and_apply(
case_law_id, overwrite_case_number=True
)
if meta.get("status") in ("completed", "no_changes"):
results["metadata_updated"] += 1
logger.info(
"enrich_migrated: %s → fields=%s",
row["case_number"], meta.get("fields"),
)
except Exception as e:
logger.error("enrich_migrated metadata failed for %s: %s", row["case_number"], e)
results["failed"] += 1
continue
try:
await _db.request_halacha_extraction(case_law_id)
results["halachot_queued"] += 1
except Exception as e:
logger.error("enrich_migrated halacha queue failed for %s: %s", row["case_number"], e)
return results
async def search_internal(
query: str,
*,
practice_area: str = "",
appeal_subtype: str = "",
district: str = "",
chair_name: str = "",
limit: int = 10,
include_halachot: bool = True,
) -> list[dict]:
"""Semantic search over internal committee decisions."""
from legal_mcp.services import hybrid_search
if not query.strip():
return []
query_vec = await embeddings.embed_query(query)
return await hybrid_search.search_precedent_library_hybrid(
query=query,
query_text_embedding=query_vec,
limit=limit,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
include_halachot=include_halachot,
source_kind="internal_committee",
district=district,
chair_name=chair_name,
)