Files
legal-ai/mcp-server/src/legal_mcp/tools/search.py
Chaim 2aee398b4a
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m35s
feat: Stage C — RAG advanced (#33, #47, #48, #49, #50, #51)
Six independent sub-tasks dispatched in parallel; aggregated here.

## #33 — Hide case_name column
library-list-panel.tsx: `<TableHead>` + `<TableCell>` for "שם"
get `className="hidden"` in both Court and Committee row variants.
DB column preserved for future use.

## #47 — Audit script periodic
New scripts/audit_corpus_integrity.py — 3 SQL checks (external+ערר
prefix, internal missing chair/district, cases.practice_area enum)
+ CEO wakeup on violations + cron `0 7 * * *`. First run: 0 issues.

## #48 — Parent-doc retrieval (gated, default off)
Schema V17: precedent_chunks.parent_chunk_id + chunk_role
('child'|'parent'). New chunker.chunk_document_hierarchical() —
section-aware parents (~1500 tokens) containing ~5 overlapping
children (~300 tokens each). New db.store_precedent_chunks_hierarchical
two-pass writer. Search SQL (semantic + lexical) LEFT-JOIN parent and
swap content + dedupe by parent_chunk_id when flag on. Toggle:
PARENT_DOC_RETRIEVAL_ENABLED + PARENT_DOC_{CHILD,PARENT}_SIZE_TOKENS.
Backfill ~3min and ~$0.20 — deferred to follow-up.

## #49 — Multimodal backfill
New scripts/backfill_multimodal_precedents.py with token-matching
case_number ↔ source files (PDF + DOCX via PyMuPDF). Ran in container:
26 precedents embedded, 503 pages, $0.21, 0 errors. precedent_image_embeddings
grew 3 → 29 rows. 44 remaining are style_corpus-migrated rows (no
source file on disk) — will catch up when re-uploaded.

## #50 — Closed-loop feedback + nDCG
Schema V18: search_logs + search_relevance_feedback. New telemetry.py
with fire-and-forget log_search_bg (p50 = 0.002ms — zero overhead) +
auto-infer_relevance_from_citations (reads case drafts → marks score=3
when cited precedent appears in past search top-K). Hooks added to 5
search paths. scripts/compute_ndcg.py for aggregation. Two admin API
endpoints (GET /api/admin/rag-metrics + POST .../infer). Dashboard UI
deferred — API is enough for now.

## #51 — Halacha quality monitoring
New scripts/monitor_halacha_quality.py — baseline avg confidence
(trusted=0.849, all=0.833, pending=0.694) with rolling window drift
detection. Default 5% threshold. Exits non-zero on alert for cron
integration. Recommended: `0 8 * * 1` weekly Mon 8am.

## Bonus: 230 unlinked citations → missing_precedents
Bulk-imported 230 distinct unlinked citations from
precedent_internal_citations to missing_precedents.status='open',
party='committee', with notes listing source citers. Top candidate:
ע"א 3213/97 (cited 5x). Total open missing_precedents now 237.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:26:52 +00:00

390 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for RAG search over legal documents and decisions."""
from __future__ import annotations
import json
import logging
import time
from uuid import UUID
from legal_mcp.services import db, embeddings, hybrid_search, telemetry
logger = logging.getLogger(__name__)
async def search_decisions(
query: str,
limit: int = 10,
section_type: str = "",
practice_area: str = "",
appeal_subtype: str = "",
case_number: str = "",
) -> str:
"""חיפוש סמנטי בהחלטות קודמות ובמסמכים — מסונן לפי תחום משפטי.
Args:
query: שאילתת חיפוש בעברית
limit: מספר תוצאות מקסימלי
section_type: סינון לפי סוג סעיף (facts, legal_analysis, ...)
practice_area: תחום משפטי לסינון (appeals_committee/national_insurance/...)
appeal_subtype: סוג ערר לסינון (building_permit/betterment_levy/compensation_197)
case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק
"""
# Auto-resolve practice_area from case_number if available
resolved_case_id: UUID | None = None
if case_number and not practice_area:
case = await db.get_case_by_number(case_number)
if case:
practice_area = case.get("practice_area") or ""
appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "")
try:
resolved_case_id = UUID(case["id"])
except (KeyError, ValueError, TypeError):
resolved_case_id = None
if not practice_area:
logger.warning(
"search_decisions called without practice_area filter — "
"results may mix legal domains"
)
query_emb = await embeddings.embed_query(query)
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=query,
query_text_embedding=query_emb,
limit=limit,
section_type=section_type or None,
practice_area=practice_area or None,
appeal_subtype=appeal_subtype or None,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="decisions",
query=query,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
case_id=resolved_case_id,
user_agent="unknown",
)
if not results:
return "לא נמצאו תוצאות."
formatted = []
for r in results:
formatted.append({
"score": round(float(r["score"]), 4),
"case_number": r.get("case_number"),
"document": r.get("document_title"),
"section": r.get("section_type"),
"page": r.get("page_number"),
"content": r.get("content", ""),
"match_type": r.get("match_type", "text"),
"image_thumbnail": r.get("image_thumbnail_path"),
})
return json.dumps(formatted, ensure_ascii=False, indent=2)
async def search_case_documents(
case_number: str,
query: str,
limit: int = 10,
) -> str:
"""חיפוש סמנטי בתוך מסמכי תיק ספציפי.
Args:
case_number: מספר תיק הערר
query: שאילתת חיפוש
limit: מספר תוצאות מקסימלי
"""
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
case_uuid = UUID(case["id"])
query_emb = await embeddings.embed_query(query)
# Restricted to case_id — practice_area filter would be redundant.
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=query,
query_text_embedding=query_emb,
limit=limit,
case_id=case_uuid,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="case_documents",
query=query,
results=results,
duration_ms=elapsed_ms,
case_id=case_uuid,
user_agent="unknown",
)
if not results:
return f"לא נמצאו תוצאות בתיק {case_number}."
formatted = []
for r in results:
formatted.append({
"score": round(float(r["score"]), 4),
"document": r.get("document_title"),
"section": r.get("section_type"),
"page": r.get("page_number"),
"content": r.get("content", ""),
"match_type": r.get("match_type", "text"),
"image_thumbnail": r.get("image_thumbnail_path"),
})
return json.dumps(formatted, ensure_ascii=False, indent=2)
async def find_similar_cases(
description: str,
limit: int = 5,
practice_area: str = "",
appeal_subtype: str = "",
case_number: str = "",
) -> str:
"""מציאת תיקים דומים על בסיס תיאור — מסונן לפי תחום משפטי.
Args:
description: תיאור התיק או הנושא
limit: מספר תוצאות מקסימלי
practice_area: תחום משפטי לסינון
appeal_subtype: סוג ערר לסינון
case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק
"""
resolved_case_id: UUID | None = None
if case_number and not practice_area:
case = await db.get_case_by_number(case_number)
if case:
practice_area = case.get("practice_area") or ""
appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "")
try:
resolved_case_id = UUID(case["id"])
except (KeyError, ValueError, TypeError):
resolved_case_id = None
if not practice_area:
logger.warning(
"find_similar_cases called without practice_area filter — "
"results may mix legal domains"
)
query_emb = await embeddings.embed_query(description)
# Even with rerank we ask for ``limit*3`` so the dedup-by-case
# step downstream still has enough rows to pick the best per case.
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=description,
query_text_embedding=query_emb,
limit=limit * 3,
practice_area=practice_area or None,
appeal_subtype=appeal_subtype or None,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="similar_cases",
query=description,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
case_id=resolved_case_id,
user_agent="unknown",
)
if not results:
return "לא נמצאו תיקים דומים."
# Deduplicate by case_number, keep best score per case.
# image-only rows still carry case_number from the join.
seen_cases = {}
for r in results:
cn = r.get("case_number")
if not cn:
continue
if cn not in seen_cases or r["score"] > seen_cases[cn]["score"]:
seen_cases[cn] = r
top_cases = sorted(seen_cases.values(), key=lambda x: x["score"], reverse=True)[:limit]
formatted = []
for r in top_cases:
formatted.append({
"score": round(float(r["score"]), 4),
"case_number": r["case_number"],
"document": r.get("document_title"),
"relevant_section": (r.get("content") or "")[:500],
"match_type": r.get("match_type", "text"),
})
return json.dumps(formatted, ensure_ascii=False, indent=2)
async def search_internal_decisions(
query: str,
practice_area: str = "",
appeal_subtype: str = "",
district: str = "",
chair_name: str = "",
limit: int = 10,
include_halachot: bool = True,
include_cited_by: bool = False,
) -> str:
"""חיפוש בהחלטות ועדות ערר לתכנון ובנייה (כל המחוזות).
Args:
query: שאילתת חיפוש בעברית
practice_area: rishuy_uvniya / betterment_levy / compensation_197
appeal_subtype: סינון לפי תת-סוג ערר
district: מחוז — ירושלים / מרכז / תל אביב / צפון / דרום / ארצי. ריק = כל המחוזות
chair_name: שם יו"ר הוועדה לסינון. ריק = כל היו"רים
limit: מספר תוצאות מקסימלי
include_halachot: האם לכלול הלכות שחולצו
include_cited_by: True = אחרי החיפוש הראשי, הוסף החלטות שה-hits
הראשיים מצטטים (מתוך precedent_internal_citations). default False
כדי לא לשבור caller-ים קיימים. match_type='cited_by' מציין שזו
תוצאה משנית.
"""
from legal_mcp.services import internal_decisions as int_svc
# Bump the limit a bit when we're expanding via citations — the
# citation step is cheap and a few extra primary hits make the
# expansion more useful.
primary_limit = limit if not include_cited_by else max(limit, limit * 2)
t0 = time.perf_counter()
results = await int_svc.search_internal(
query,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
district=district,
chair_name=chair_name,
limit=primary_limit,
include_halachot=include_halachot,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="internal_decisions",
query=query,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
user_agent="unknown",
)
if not results:
return "לא נמצאו החלטות ועדת ערר רלוונטיות."
# Cap primary results back to ``limit`` (we over-fetched only to seed
# the citation expansion below — the user asked for ``limit`` items).
primary = results[:limit]
formatted = []
seen_case_law_ids: set[str] = set()
for r in primary:
clid = str(r.get("case_law_id") or "")
if clid:
seen_case_law_ids.add(clid)
formatted.append(_format_internal_row(r, match_type="primary"))
if include_cited_by and seen_case_law_ids:
from uuid import UUID
from legal_mcp.services import citation_extractor
try:
source_uuids = [UUID(s) for s in seen_case_law_ids]
cited_map = await citation_extractor.get_cited_case_law_ids(source_uuids)
except Exception as e:
logger.warning("include_cited_by lookup failed: %s", e)
cited_map = {}
# Flatten + dedup the cited case_law_ids that aren't already in
# the primary set.
cited_ids: set[str] = set()
for ids in cited_map.values():
for cid in ids:
if cid and cid not in seen_case_law_ids:
cited_ids.add(cid)
if cited_ids:
cited_rows = await _fetch_case_law_summaries(list(cited_ids))
for row in cited_rows:
formatted.append(_format_internal_row(row, match_type="cited_by"))
return json.dumps(formatted, ensure_ascii=False, indent=2)
def _format_internal_row(r: dict, *, match_type: str = "primary") -> dict:
"""Shape an internal-decision hit (or a cited_by stub) for the MCP response."""
entry: dict = {
"score": round(float(r.get("score", 0.0)), 4),
"type": r.get("type", "passage"),
"case_number": r.get("case_number"),
"case_name": r.get("case_name"),
"court": r.get("court"),
"district": r.get("district"),
"chair_name": r.get("chair_name"),
"decision_date": r.get("decision_date"),
"match_type": match_type,
}
if r.get("type") == "halacha":
entry["rule"] = r.get("rule_statement")
entry["quote"] = r.get("supporting_quote")
entry["rule_type"] = r.get("rule_type")
else:
entry["content"] = r.get("content", "")
entry["section"] = r.get("section_type")
entry["page"] = r.get("page_number")
return entry
async def _fetch_case_law_summaries(case_law_ids: list[str]) -> list[dict]:
"""Pull lightweight metadata for a set of case_law UUIDs (cited-by stubs).
Doesn't pull chunks/halachot — the goal is to surface the existence of
the related precedent, not to repeat search. The caller can drill in
via search_internal_decisions with chair_name+case_number if they want
full passages.
"""
from uuid import UUID
pool = await db.get_pool()
uuid_list = []
for s in case_law_ids:
try:
uuid_list.append(UUID(s))
except ValueError:
continue
if not uuid_list:
return []
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id::text AS case_law_id,
case_number,
case_name,
court,
district,
chair_name,
date AS decision_date,
headnote AS content
FROM case_law
WHERE id = ANY($1::uuid[])
""",
uuid_list,
)
out: list[dict] = []
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
# Stub rows show up with score 0 — they're not ranked, they're context.
d["score"] = 0.0
d["type"] = "passage"
out.append(d)
return out