Files
legal-ai/mcp-server/src/legal_mcp/tools/search.py
Chaim aa0a736a7b feat(mcp): FU-14 GAP-48 פרוסה 1 — envelope אחיד (SSoT) + משפחת-חיפוש
INV-TOOL1: כלי-ה-MCP החזירו 3 מוסכמות סותרות (raw payload / {error} /
{status,message} אד-הוק) + 5 עותקי _ok/_err משוכפלים. נוצר tools/envelope.py
כמקור-אמת יחיד: ok/empty/err → {status,data,message}, כש-status מבחין
מפורשות הצלחה/ריק/שגיאה.

פרוסה 1 ממירה את משפחת-החיפוש (search_decisions, search_case_documents,
find_similar_cases, search_internal_decisions). web/app.py מפרק את המעטפת
דרך envelope_unwrap כדי לשמר את חוזה-ה-UI↔API (X6) ללא-שינוי — תשובת ה-HTTP
זהה (list על hits, {"message"} על ריק/שגיאה). טסט test_search_domain_scope
עודכן לחוזה החדש (5/5 עוברים).

החלטה: הדרגתי לפי-משפחה ולא big-bang. מפת-צרכנים: server.py pass-through,
web-ui מבודד (/api/*), רק 17 כלים נצרכים ישירות מ-app.py → סיכון מינימלי
לסוכנים החיים. ~73 כלים נותרו לפרוסות הבאות.

Invariants: מקדם INV-TOOL1 (envelope עקבי) + G2 (SSoT, ביטול כפילות _ok/_err).
לא נוגע ב-G1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 16:32:07 +00:00

408 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for RAG search over legal documents and decisions."""
from __future__ import annotations
import logging
import time
from uuid import UUID
from legal_mcp.services import db, embeddings, hybrid_search, practice_area as pa, telemetry
from legal_mcp.tools.envelope import empty, err, ok
logger = logging.getLogger(__name__)
async def search_decisions(
query: str,
limit: int = 10,
section_type: str = "",
practice_area: str = "",
appeal_subtype: str = "",
case_number: str = "",
) -> str:
"""חיפוש סמנטי בהחלטות קודמות ובמסמכים — מסונן לפי תחום משפטי.
Args:
query: שאילתת חיפוש בעברית
limit: מספר תוצאות מקסימלי
section_type: סינון לפי סוג סעיף (facts, legal_analysis, ...)
practice_area: תחום משפטי לסינון (appeals_committee/national_insurance/...)
appeal_subtype: סוג ערר לסינון (building_permit/betterment_levy/compensation_197)
case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק
"""
# Auto-resolve practice_area from case_number if available (GAP-12 / INV-RET1):
# explicit practice_area wins; otherwise derive from the case so the search is
# scoped to the case's legal domain. Case-less search stays cross-domain.
resolved_case_id: UUID | None = None
if case_number and not practice_area:
case = await db.get_case_by_number(case_number)
if case:
practice_area = case.get("practice_area") or ""
appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "")
try:
resolved_case_id = UUID(case["id"])
except (KeyError, ValueError, TypeError):
resolved_case_id = None
# Case row had no practice_area — fall back to deriving from the
# case-number prefix (1xxx/8xxx/9xxx). Returns "" for unknown prefixes.
if not practice_area:
practice_area = pa.derive_domain_practice_area(case_number)
# Still undeterminable: a case is present but we cannot scope the
# search to its domain. This is a data anomaly — BLOCK rather than
# silently running a cross-domain search for a specific case.
if not practice_area:
return err(
f"לא ניתן לקבוע את התחום המשפטי (practice_area) של תיק "
f"{case_number}. לתיק אין practice_area מוגדר ולא ניתן להסיק אותו "
f"ממספר התיק. זוהי אנומליית נתונים — נא להגדיר את ה-practice_area "
f"של התיק (למשל דרך case_update) לפני הרצת חיפוש מסונן לתיק זה."
)
if not practice_area:
logger.warning(
"search_decisions called without practice_area filter — "
"results may mix legal domains"
)
query_emb = await embeddings.embed_query(query)
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=query,
query_text_embedding=query_emb,
limit=limit,
section_type=section_type or None,
practice_area=practice_area or None,
appeal_subtype=appeal_subtype or None,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="decisions",
query=query,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
case_id=resolved_case_id,
user_agent="unknown",
)
if not results:
return empty("לא נמצאו תוצאות.")
formatted = []
for r in results:
formatted.append({
"score": round(float(r["score"]), 4),
"case_number": r.get("case_number"),
"document": r.get("document_title"),
"section": r.get("section_type"),
"page": r.get("page_number"),
"content": r.get("content", ""),
"match_type": r.get("match_type", "text"),
"image_thumbnail": r.get("image_thumbnail_path"),
})
return ok(formatted)
async def search_case_documents(
case_number: str,
query: str,
limit: int = 10,
) -> str:
"""חיפוש סמנטי בתוך מסמכי תיק ספציפי.
Args:
case_number: מספר תיק הערר
query: שאילתת חיפוש
limit: מספר תוצאות מקסימלי
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_uuid = UUID(case["id"])
query_emb = await embeddings.embed_query(query)
# Restricted to case_id — practice_area filter would be redundant.
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=query,
query_text_embedding=query_emb,
limit=limit,
case_id=case_uuid,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="case_documents",
query=query,
results=results,
duration_ms=elapsed_ms,
case_id=case_uuid,
user_agent="unknown",
)
if not results:
return empty(f"לא נמצאו תוצאות בתיק {case_number}.")
formatted = []
for r in results:
formatted.append({
"score": round(float(r["score"]), 4),
"document": r.get("document_title"),
"section": r.get("section_type"),
"page": r.get("page_number"),
"content": r.get("content", ""),
"match_type": r.get("match_type", "text"),
"image_thumbnail": r.get("image_thumbnail_path"),
})
return ok(formatted)
async def find_similar_cases(
description: str,
limit: int = 5,
practice_area: str = "",
appeal_subtype: str = "",
case_number: str = "",
) -> str:
"""מציאת תיקים דומים על בסיס תיאור — מסונן לפי תחום משפטי.
Args:
description: תיאור התיק או הנושא
limit: מספר תוצאות מקסימלי
practice_area: תחום משפטי לסינון
appeal_subtype: סוג ערר לסינון
case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק
"""
resolved_case_id: UUID | None = None
if case_number and not practice_area:
case = await db.get_case_by_number(case_number)
if case:
practice_area = case.get("practice_area") or ""
appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "")
try:
resolved_case_id = UUID(case["id"])
except (KeyError, ValueError, TypeError):
resolved_case_id = None
if not practice_area:
logger.warning(
"find_similar_cases called without practice_area filter — "
"results may mix legal domains"
)
query_emb = await embeddings.embed_query(description)
# Even with rerank we ask for ``limit*3`` so the dedup-by-case
# step downstream still has enough rows to pick the best per case.
t0 = time.perf_counter()
results = await hybrid_search.search_documents_hybrid(
query=description,
query_text_embedding=query_emb,
limit=limit * 3,
practice_area=practice_area or None,
appeal_subtype=appeal_subtype or None,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="similar_cases",
query=description,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
case_id=resolved_case_id,
user_agent="unknown",
)
if not results:
return empty("לא נמצאו תיקים דומים.")
# Deduplicate by case_number, keep best score per case.
# image-only rows still carry case_number from the join.
seen_cases = {}
for r in results:
cn = r.get("case_number")
if not cn:
continue
if cn not in seen_cases or r["score"] > seen_cases[cn]["score"]:
seen_cases[cn] = r
top_cases = sorted(seen_cases.values(), key=lambda x: x["score"], reverse=True)[:limit]
formatted = []
for r in top_cases:
formatted.append({
"score": round(float(r["score"]), 4),
"case_number": r["case_number"],
"document": r.get("document_title"),
"relevant_section": (r.get("content") or "")[:500],
"match_type": r.get("match_type", "text"),
})
return ok(formatted)
async def search_internal_decisions(
query: str,
practice_area: str = "",
appeal_subtype: str = "",
district: str = "",
chair_name: str = "",
limit: int = 10,
include_halachot: bool = True,
include_cited_by: bool = False,
) -> str:
"""חיפוש בהחלטות ועדות ערר לתכנון ובנייה (כל המחוזות).
Args:
query: שאילתת חיפוש בעברית
practice_area: rishuy_uvniya / betterment_levy / compensation_197
appeal_subtype: סינון לפי תת-סוג ערר
district: מחוז — ירושלים / מרכז / תל אביב / צפון / דרום / ארצי. ריק = כל המחוזות
chair_name: שם יו"ר הוועדה לסינון. ריק = כל היו"רים
limit: מספר תוצאות מקסימלי
include_halachot: האם לכלול הלכות שחולצו
include_cited_by: True = אחרי החיפוש הראשי, הוסף החלטות שה-hits
הראשיים מצטטים (מתוך precedent_internal_citations). default False
כדי לא לשבור caller-ים קיימים. match_type='cited_by' מציין שזו
תוצאה משנית.
"""
from legal_mcp.services import internal_decisions as int_svc
# Bump the limit a bit when we're expanding via citations — the
# citation step is cheap and a few extra primary hits make the
# expansion more useful.
primary_limit = limit if not include_cited_by else max(limit, limit * 2)
t0 = time.perf_counter()
results = await int_svc.search_internal(
query,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
district=district,
chair_name=chair_name,
limit=primary_limit,
include_halachot=include_halachot,
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
telemetry.log_search_bg(
search_type="internal_decisions",
query=query,
results=results,
duration_ms=elapsed_ms,
practice_area=practice_area or None,
user_agent="unknown",
)
if not results:
return empty("לא נמצאו החלטות ועדת ערר רלוונטיות.")
# Cap primary results back to ``limit`` (we over-fetched only to seed
# the citation expansion below — the user asked for ``limit`` items).
primary = results[:limit]
formatted = []
seen_case_law_ids: set[str] = set()
for r in primary:
clid = str(r.get("case_law_id") or "")
if clid:
seen_case_law_ids.add(clid)
formatted.append(_format_internal_row(r, match_type="primary"))
if include_cited_by and seen_case_law_ids:
from uuid import UUID
from legal_mcp.services import citation_extractor
try:
source_uuids = [UUID(s) for s in seen_case_law_ids]
cited_map = await citation_extractor.get_cited_case_law_ids(source_uuids)
except Exception as e:
logger.warning("include_cited_by lookup failed: %s", e)
cited_map = {}
# Flatten + dedup the cited case_law_ids that aren't already in
# the primary set.
cited_ids: set[str] = set()
for ids in cited_map.values():
for cid in ids:
if cid and cid not in seen_case_law_ids:
cited_ids.add(cid)
if cited_ids:
cited_rows = await _fetch_case_law_summaries(list(cited_ids))
for row in cited_rows:
formatted.append(_format_internal_row(row, match_type="cited_by"))
return ok(formatted)
def _format_internal_row(r: dict, *, match_type: str = "primary") -> dict:
"""Shape an internal-decision hit (or a cited_by stub) for the MCP response."""
entry: dict = {
"score": round(float(r.get("score", 0.0)), 4),
"type": r.get("type", "passage"),
"case_number": r.get("case_number"),
"case_name": r.get("case_name"),
"court": r.get("court"),
"district": r.get("district"),
"chair_name": r.get("chair_name"),
"decision_date": r.get("decision_date"),
"match_type": match_type,
}
if r.get("type") == "halacha":
entry["rule"] = r.get("rule_statement")
entry["quote"] = r.get("supporting_quote")
entry["rule_type"] = r.get("rule_type")
else:
entry["content"] = r.get("content", "")
entry["section"] = r.get("section_type")
entry["page"] = r.get("page_number")
return entry
async def _fetch_case_law_summaries(case_law_ids: list[str]) -> list[dict]:
"""Pull lightweight metadata for a set of case_law UUIDs (cited-by stubs).
Doesn't pull chunks/halachot — the goal is to surface the existence of
the related precedent, not to repeat search. The caller can drill in
via search_internal_decisions with chair_name+case_number if they want
full passages.
"""
from uuid import UUID
pool = await db.get_pool()
uuid_list = []
for s in case_law_ids:
try:
uuid_list.append(UUID(s))
except ValueError:
continue
if not uuid_list:
return []
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id::text AS case_law_id,
case_number,
case_name,
court,
district,
chair_name,
date AS decision_date,
headnote AS content
FROM case_law
WHERE id = ANY($1::uuid[])
""",
uuid_list,
)
out: list[dict] = []
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
# Stub rows show up with score 0 — they're not ranked, they're context.
d["score"] = 0.0
d["type"] = "passage"
out.append(d)
return out