"""MCP tools for RAG search over legal documents and decisions.""" from __future__ import annotations import json import logging from uuid import UUID from legal_mcp.services import db, embeddings, hybrid_search logger = logging.getLogger(__name__) async def search_decisions( query: str, limit: int = 10, section_type: str = "", practice_area: str = "", appeal_subtype: str = "", case_number: str = "", ) -> str: """חיפוש סמנטי בהחלטות קודמות ובמסמכים — מסונן לפי תחום משפטי. Args: query: שאילתת חיפוש בעברית limit: מספר תוצאות מקסימלי section_type: סינון לפי סוג סעיף (facts, legal_analysis, ...) practice_area: תחום משפטי לסינון (appeals_committee/national_insurance/...) appeal_subtype: סוג ערר לסינון (building_permit/betterment_levy/compensation_197) case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ # Auto-resolve practice_area from case_number if available if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") if not practice_area: logger.warning( "search_decisions called without practice_area filter — " "results may mix legal domains" ) query_emb = await embeddings.embed_query(query) results = await hybrid_search.search_documents_hybrid( query=query, query_text_embedding=query_emb, limit=limit, section_type=section_type or None, practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) if not results: return "לא נמצאו תוצאות." formatted = [] for r in results: formatted.append({ "score": round(float(r["score"]), 4), "case_number": r.get("case_number"), "document": r.get("document_title"), "section": r.get("section_type"), "page": r.get("page_number"), "content": r.get("content", ""), "match_type": r.get("match_type", "text"), "image_thumbnail": r.get("image_thumbnail_path"), }) return json.dumps(formatted, ensure_ascii=False, indent=2) async def search_case_documents( case_number: str, query: str, limit: int = 10, ) -> str: """חיפוש סמנטי בתוך מסמכי תיק ספציפי. Args: case_number: מספר תיק הערר query: שאילתת חיפוש limit: מספר תוצאות מקסימלי """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." query_emb = await embeddings.embed_query(query) # Restricted to case_id — practice_area filter would be redundant. results = await hybrid_search.search_documents_hybrid( query=query, query_text_embedding=query_emb, limit=limit, case_id=UUID(case["id"]), ) if not results: return f"לא נמצאו תוצאות בתיק {case_number}." formatted = [] for r in results: formatted.append({ "score": round(float(r["score"]), 4), "document": r.get("document_title"), "section": r.get("section_type"), "page": r.get("page_number"), "content": r.get("content", ""), "match_type": r.get("match_type", "text"), "image_thumbnail": r.get("image_thumbnail_path"), }) return json.dumps(formatted, ensure_ascii=False, indent=2) async def find_similar_cases( description: str, limit: int = 5, practice_area: str = "", appeal_subtype: str = "", case_number: str = "", ) -> str: """מציאת תיקים דומים על בסיס תיאור — מסונן לפי תחום משפטי. Args: description: תיאור התיק או הנושא limit: מספר תוצאות מקסימלי practice_area: תחום משפטי לסינון appeal_subtype: סוג ערר לסינון case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") if not practice_area: logger.warning( "find_similar_cases called without practice_area filter — " "results may mix legal domains" ) query_emb = await embeddings.embed_query(description) # Even with rerank we ask for ``limit*3`` so the dedup-by-case # step downstream still has enough rows to pick the best per case. results = await hybrid_search.search_documents_hybrid( query=description, query_text_embedding=query_emb, limit=limit * 3, practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) if not results: return "לא נמצאו תיקים דומים." # Deduplicate by case_number, keep best score per case. # image-only rows still carry case_number from the join. seen_cases = {} for r in results: cn = r.get("case_number") if not cn: continue if cn not in seen_cases or r["score"] > seen_cases[cn]["score"]: seen_cases[cn] = r top_cases = sorted(seen_cases.values(), key=lambda x: x["score"], reverse=True)[:limit] formatted = [] for r in top_cases: formatted.append({ "score": round(float(r["score"]), 4), "case_number": r["case_number"], "document": r.get("document_title"), "relevant_section": (r.get("content") or "")[:500], "match_type": r.get("match_type", "text"), }) return json.dumps(formatted, ensure_ascii=False, indent=2)