"""MCP tools for RAG search over legal documents and decisions.""" from __future__ import annotations import json import logging from uuid import UUID from legal_mcp.services import db, embeddings logger = logging.getLogger(__name__) async def search_decisions( query: str, limit: int = 10, section_type: str = "", practice_area: str = "", appeal_subtype: str = "", case_number: str = "", ) -> str: """חיפוש סמנטי בהחלטות קודמות ובמסמכים — מסונן לפי תחום משפטי. Args: query: שאילתת חיפוש בעברית limit: מספר תוצאות מקסימלי section_type: סינון לפי סוג סעיף (facts, legal_analysis, ...) practice_area: תחום משפטי לסינון (appeals_committee/national_insurance/...) appeal_subtype: סוג ערר לסינון (building_permit/betterment_levy/compensation_197) case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ # Auto-resolve practice_area from case_number if available if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") if not practice_area: logger.warning( "search_decisions called without practice_area filter — " "results may mix legal domains" ) query_emb = await embeddings.embed_query(query) results = await db.search_similar( query_embedding=query_emb, limit=limit, section_type=section_type or None, practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) if not results: return "לא נמצאו תוצאות." formatted = [] for r in results: formatted.append({ "score": round(float(r["score"]), 4), "case_number": r["case_number"], "document": r["document_title"], "section": r["section_type"], "page": r["page_number"], "content": r["content"], }) return json.dumps(formatted, ensure_ascii=False, indent=2) async def search_case_documents( case_number: str, query: str, limit: int = 10, ) -> str: """חיפוש סמנטי בתוך מסמכי תיק ספציפי. Args: case_number: מספר תיק הערר query: שאילתת חיפוש limit: מספר תוצאות מקסימלי """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." query_emb = await embeddings.embed_query(query) # Restricted to case_id — practice_area filter would be redundant. results = await db.search_similar( query_embedding=query_emb, limit=limit, case_id=UUID(case["id"]), ) if not results: return f"לא נמצאו תוצאות בתיק {case_number}." formatted = [] for r in results: formatted.append({ "score": round(float(r["score"]), 4), "document": r["document_title"], "section": r["section_type"], "page": r["page_number"], "content": r["content"], }) return json.dumps(formatted, ensure_ascii=False, indent=2) async def find_similar_cases( description: str, limit: int = 5, practice_area: str = "", appeal_subtype: str = "", case_number: str = "", ) -> str: """מציאת תיקים דומים על בסיס תיאור — מסונן לפי תחום משפטי. Args: description: תיאור התיק או הנושא limit: מספר תוצאות מקסימלי practice_area: תחום משפטי לסינון appeal_subtype: סוג ערר לסינון case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") if not practice_area: logger.warning( "find_similar_cases called without practice_area filter — " "results may mix legal domains" ) query_emb = await embeddings.embed_query(description) results = await db.search_similar( query_embedding=query_emb, limit=limit * 3, # Get more to deduplicate by case practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) if not results: return "לא נמצאו תיקים דומים." # Deduplicate by case_number, keep best score per case seen_cases = {} for r in results: cn = r["case_number"] if cn not in seen_cases or r["score"] > seen_cases[cn]["score"]: seen_cases[cn] = r # Sort by score and limit top_cases = sorted(seen_cases.values(), key=lambda x: x["score"], reverse=True)[:limit] formatted = [] for r in top_cases: formatted.append({ "score": round(float(r["score"]), 4), "case_number": r["case_number"], "document": r["document_title"], "relevant_section": r["content"][:500], }) return json.dumps(formatted, ensure_ascii=False, indent=2)