feat: add internal committee decisions corpus (source_kind='internal_committee')
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m31s

Three-layer separation: style learning (style_corpus), appeals-committee decisions
(internal_committee), and court rulings (external_upload).

- SCHEMA_V10: chair_name + district columns on case_law and cases, partial indexes
- create_internal_committee_decision() DB upsert function
- search_precedent_library_semantic() now accepts source_kind/district/chair_name params
- search_precedent_library_hybrid() passes through new params
- services/internal_decisions.py: ingest_internal_decision, migrate_from_style_corpus,
  migrate_from_external_corpus (identifies rows via source_type='appeals_committee')
- search_internal_decisions() MCP tool (server.py + tools/search.py)
- internal_decision_migrate() MCP admin tool
- Web endpoints: POST /api/internal-decisions/upload, POST /api/internal-decisions/migrate,
  GET /api/internal-decisions
- ingest_final_version auto-ingests finalized decisions into internal corpus
- SKILL.md updated: agents now search internal + external in parallel, present separately

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-04 18:33:39 +00:00
parent 1b14e04373
commit 92a2763b86
8 changed files with 718 additions and 15 deletions

View File

@@ -0,0 +1,301 @@
"""Orchestrator for the Internal Committee Decisions corpus.
Ingest pipeline:
text/file → INSERT case_law (source_kind='internal_committee')
→ chunk → embed → store precedent_chunks
→ queue halacha extraction
Migration helpers:
migrate_from_style_corpus() — re-index style_corpus entries as searchable
migrate_from_external_corpus() — reclassify external appeals-committee rows
All ועדות ערר (any district) belong here.
Judicial decisions (Supreme Court, Administrative Court) stay in external_upload.
"""
from __future__ import annotations
import logging
import re
import shutil
from datetime import date
from pathlib import Path
from uuid import UUID, uuid4
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
INTERNAL_DECISIONS_DIR = Path(config.DATA_DIR) / "internal-decisions"
_VALID_DISTRICTS = {"", "ירושלים", "מרכז", "תל אביב", "צפון", "דרום", "ארצי"}
_COURT_TO_DISTRICT = [
("ירושלים", "ירושלים"),
("תל אביב", "תל אביב"),
('ת"א', "תל אביב"),
("מרכז", "מרכז"),
("חיפה", "צפון"),
("צפון", "צפון"),
("דרום", "דרום"),
("ארצי", "ארצי"),
("ארצית", "ארצי"),
]
def _coerce_date(value) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date):
return value
if isinstance(value, str):
try:
return date.fromisoformat(value[:10])
except ValueError:
return None
return None
def _safe_filename(name: str) -> str:
base = Path(name).name
return re.sub(r"[^\w.\-+א-ת ]", "_", base) or f"internal-{uuid4().hex[:8]}"
def _district_from_court(court: str) -> str:
for keyword, district in _COURT_TO_DISTRICT:
if keyword in court:
return district
return ""
async def ingest_internal_decision(
*,
case_number: str,
case_name: str = "",
court: str = "",
decision_date=None,
chair_name: str = "",
district: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
summary: str = "",
is_binding: bool = True,
file_path: str | Path | None = None,
text: str | None = None,
document_id: UUID | None = None,
) -> dict:
"""Ingest an appeals-committee decision into the internal corpus.
Either file_path or text must be provided.
If district is empty, it is inferred from court.
Returns: {"status": "completed", "case_law_id": "...", "chunks": N}
"""
if not file_path and not text:
raise ValueError("either file_path or text is required")
if not case_number.strip():
raise ValueError("case_number is required")
resolved_district = district.strip() or _district_from_court(court)
if file_path:
src = Path(file_path)
if not src.is_file():
raise FileNotFoundError(f"file not found: {src}")
dest_dir = INTERNAL_DECISIONS_DIR / (resolved_district or "other")
dest_dir.mkdir(parents=True, exist_ok=True)
staged = dest_dir / f"{uuid4().hex[:8]}_{_safe_filename(src.name)}"
shutil.copy2(src, staged)
raw_text, page_count, page_offsets = await extractor.extract_text(str(staged))
raw_text = extractor.strip_nevo_preamble(raw_text or "").strip()
if not raw_text:
raise ValueError("no extractable text in file")
else:
raw_text = (text or "").strip()
if not raw_text:
raise ValueError("text is empty")
page_count = 0
page_offsets = None
record = await db.create_internal_committee_decision(
case_number=case_number.strip(),
case_name=(case_name.strip() or case_number.strip()),
full_text=raw_text,
court=court.strip(),
decision_date=_coerce_date(decision_date),
chair_name=chair_name.strip(),
district=resolved_district,
practice_area=practice_area,
appeal_subtype=appeal_subtype.strip(),
subject_tags=list(subject_tags or []),
summary=summary.strip(),
is_binding=is_binding,
document_id=document_id,
)
case_law_id = UUID(str(record["id"]))
try:
chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets)
if not chunks:
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0}
chunk_texts = [c.content for c in chunks]
chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document")
chunk_dicts = [
{
"chunk_index": c.chunk_index,
"content": c.content,
"section_type": c.section_type,
"page_number": c.page_number,
"embedding": v,
}
for c, v in zip(chunks, chunk_vectors)
]
stored = await db.store_precedent_chunks(case_law_id, chunk_dicts)
await db.set_case_law_extraction_status(case_law_id, "completed")
await db.set_case_law_halacha_status(case_law_id, "pending")
await db.request_halacha_extraction(case_law_id)
return {
"status": "completed",
"case_law_id": str(case_law_id),
"chunks": stored,
"halachot_pending": True,
}
except Exception:
logger.exception("ingest_internal_decision failed for %s", case_number)
await db.set_case_law_extraction_status(case_law_id, "failed")
raise
async def migrate_from_style_corpus(dry_run: bool = False) -> dict:
"""Re-index all style_corpus entries as searchable internal committee decisions.
Does NOT delete style_corpus rows — they remain for style analysis.
Skips entries that already exist in case_law as internal_committee.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT decision_number, decision_date, full_text,
practice_area, appeal_subtype, subject_categories
FROM style_corpus
ORDER BY decision_date NULLS LAST"""
)
results = {"total": len(rows), "ingested": 0, "skipped": 0, "failed": 0, "dry_run": dry_run}
for row in rows:
case_number = (row["decision_number"] or "").strip()
if not case_number:
results["skipped"] += 1
continue
if not dry_run:
existing = await pool.fetchval(
"SELECT id FROM case_law WHERE case_number = $1 AND source_kind = 'internal_committee'",
case_number,
)
if existing:
results["skipped"] += 1
continue
if dry_run:
results["ingested"] += 1
continue
try:
subject_tags = list(row["subject_categories"] or [])
await ingest_internal_decision(
case_number=case_number,
court="ועדת הערר לתכנון ובנייה — מחוז ירושלים",
decision_date=row["decision_date"],
chair_name="דפנה תמיר",
district="ירושלים",
practice_area=row["practice_area"] or "",
appeal_subtype=row["appeal_subtype"] or "",
subject_tags=subject_tags,
text=row["full_text"],
)
results["ingested"] += 1
logger.info("Migrated style_corpus entry: %s", case_number)
except Exception as e:
logger.error("Failed to migrate %s: %s", case_number, e)
results["failed"] += 1
return results
async def migrate_from_external_corpus(dry_run: bool = False) -> dict:
"""Reclassify external appeals-committee decisions to source_kind='internal_committee'.
Identifies rows by source_type='appeals_committee' and updates source_kind + district.
Existing precedent_chunks remain — no re-embedding needed.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT id, case_number, court
FROM case_law
WHERE source_kind = 'external_upload'
AND source_type = 'appeals_committee'"""
)
results = {"total": len(rows), "updated": 0, "dry_run": dry_run}
if dry_run:
results["updated"] = len(rows)
results["preview"] = [
{"case_number": r["case_number"], "court": r["court"], "district": _district_from_court(r["court"] or "")}
for r in rows
]
return results
async with pool.acquire() as conn:
for row in rows:
district = _district_from_court(row["court"] or "")
await conn.execute(
"""UPDATE case_law
SET source_kind = 'internal_committee',
district = CASE WHEN $2 <> '' THEN $2 ELSE district END
WHERE id = $1""",
row["id"], district,
)
results["updated"] = len(rows)
logger.info("Migrated %d external appeals-committee rows to internal_committee", len(rows))
return results
async def search_internal(
query: str,
*,
practice_area: str = "",
appeal_subtype: str = "",
district: str = "",
chair_name: str = "",
limit: int = 10,
include_halachot: bool = True,
) -> list[dict]:
"""Semantic search over internal committee decisions."""
from legal_mcp.services import hybrid_search
if not query.strip():
return []
query_vec = await embeddings.embed_query(query)
return await hybrid_search.search_precedent_library_hybrid(
query=query,
query_text_embedding=query_vec,
limit=limit,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
include_halachot=include_halachot,
source_kind="internal_committee",
district=district,
chair_name=chair_name,
)