"""MCP tools for document management and processing.""" from __future__ import annotations import json import shutil from pathlib import Path from uuid import UUID from legal_mcp import config from legal_mcp.services import db, git_sync, processor async def document_upload( case_number: str, file_path: str, doc_type: str = "auto", title: str = "", ) -> str: """העלאה ועיבוד מסמך לתיק ערר. מחלץ טקסט, יוצר chunks ו-embeddings. Args: case_number: מספר תיק הערר file_path: נתיב מלא לקובץ (PDF, DOCX, RTF, TXT) doc_type: סוג מסמך (auto=סיווג אוטומטי, appeal=כתב ערר, response=תשובה, protocol=פרוטוקול, plan=תכנית, permit=היתר, court_decision=פסק דין, decision=החלטת ועדה, appraisal=שומה, objection=התנגדות, exhibit=נספח, reference=מסמך עזר) title: שם המסמך (אם ריק, ייקח משם הקובץ) """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." source = Path(file_path) if not source.exists(): return f"קובץ לא נמצא: {file_path}" case_id = UUID(case["id"]) if not title: title = source.stem # Copy file to case directory case_dir = config.find_case_dir(case_number) / "documents" / "originals" case_dir.mkdir(parents=True, exist_ok=True) dest = case_dir / source.name shutil.copy2(str(source), str(dest)) # For auto classification, start with "reference" — will be updated after processing initial_doc_type = doc_type if doc_type != "auto" else "reference" # Create document record doc = await db.create_document( case_id=case_id, doc_type=initial_doc_type, title=title, file_path=str(dest), ) # Process document (extract → classify → chunk → embed → store) result = await processor.process_document(UUID(doc["id"]), case_id) # If auto-classification, update doc_type from classification result actual_doc_type = initial_doc_type if doc_type == "auto" and result.get("classification"): classified_type = result["classification"].get("classification", {}).get("doc_type", "") if classified_type: actual_doc_type = classified_type await db.update_document(UUID(doc["id"]), doc_type=classified_type) doc["doc_type"] = classified_type # Git commit + push (best-effort — don't fail upload on git errors) try: repo_dir = config.find_case_dir(case_number) if repo_dir.exists(): doc_type_hebrew = { "appeal": "כתב ערר", "response": "תשובה", "protocol": "פרוטוקול", "plan": "תכנית", "permit": "היתר", "court_decision": "פסק דין", "decision": "החלטה", "appraisal": "שומה", "objection": "התנגדות", "exhibit": "נספח", "reference": "מסמך עזר", }.get(actual_doc_type, actual_doc_type) git_sync.commit_and_push(repo_dir, f"הוספת {doc_type_hebrew}: {title}") except Exception: pass # git not available in container — non-critical return json.dumps({ "document": doc, "processing": result, }, default=str, ensure_ascii=False, indent=2) async def document_upload_training( file_path: str, decision_number: str = "", decision_date: str = "", subject_categories: list[str] | None = None, title: str = "", practice_area: str = "appeals_committee", appeal_subtype: str = "", ) -> str: """העלאת החלטה קודמת של דפנה לקורפוס הסגנון (training). Args: file_path: נתיב מלא לקובץ ההחלטה decision_number: מספר ההחלטה decision_date: תאריך ההחלטה (YYYY-MM-DD) subject_categories: קטגוריות - אפשר לבחור כמה (בנייה, שימוש חורג, תכנית, היתר, הקלה, חלוקה, תמ"א 38, היטל השבחה, פיצויים 197) title: שם המסמך practice_area: תחום משפטי (appeals_committee / national_insurance / labor_law) appeal_subtype: סוג ערר (building_permit / betterment_levy / compensation_197). ריק = יוסק אוטומטית ממספר ההחלטה """ from datetime import date as date_type from legal_mcp.services import chunker, embeddings, extractor, practice_area as pa source = Path(file_path) if not source.exists(): return f"קובץ לא נמצא: {file_path}" if not title: title = source.stem # Resolve subtype: explicit > derived from decision_number > 'unknown' if not appeal_subtype: appeal_subtype = pa.derive_subtype(decision_number, practice_area) pa.validate(practice_area, appeal_subtype) # Copy to training directory, organized by subtype _SUBTYPE_DIRS = { "betterment_levy": "cmpa", "compensation_197": "cmpa", "building_permit": "cmp", } subdir = _SUBTYPE_DIRS.get(appeal_subtype, "") training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR training_dest.mkdir(parents=True, exist_ok=True) dest = training_dest / source.name if source.resolve() != dest.resolve(): shutil.copy2(str(source), str(dest)) # Extract text and strip Nevo preamble text, page_count, _ = await extractor.extract_text(str(dest)) text = extractor.strip_nevo_preamble(text) # Parse date d_date = None if decision_date: d_date = date_type.fromisoformat(decision_date) # Add to style corpus (tagged by domain so block-writer can filter) corpus_id = await db.add_to_style_corpus( document_id=None, decision_number=decision_number, decision_date=d_date, subject_categories=subject_categories or [], full_text=text, practice_area=practice_area, appeal_subtype=appeal_subtype, ) # Chunk and embed for RAG search over training corpus chunks = chunker.chunk_document(text) if chunks: # Create a document record (no case association — tag explicitly) doc = await db.create_document( case_id=None, doc_type="decision", title=f"[קורפוס] {title}", file_path=str(dest), page_count=page_count, ) doc_id = UUID(doc["id"]) await db.update_document( doc_id, extracted_text=text, extraction_status="completed", metadata={"practice_area": practice_area, "appeal_subtype": appeal_subtype}, ) # Generate embeddings and store chunks texts = [c.content for c in chunks] embs = await embeddings.embed_texts(texts, input_type="document") chunk_dicts = [ { "content": c.content, "section_type": c.section_type, "embedding": emb, "page_number": c.page_number, "chunk_index": c.chunk_index, } for c, emb in zip(chunks, embs) ] await db.store_chunks(doc_id, None, chunk_dicts) return json.dumps({ "corpus_id": str(corpus_id), "title": title, "pages": page_count, "text_length": len(text), "chunks": len(chunks) if chunks else 0, }, default=str, ensure_ascii=False, indent=2) async def document_get_text(case_number: str, doc_title: str = "") -> str: """קבלת טקסט מלא של מסמך מתוך תיק. Args: case_number: מספר תיק הערר doc_title: שם המסמך (אם ריק, מחזיר את כל המסמכים) """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." docs = await db.list_documents(UUID(case["id"])) if not docs: return f"אין מסמכים בתיק {case_number}." if doc_title: docs = [d for d in docs if doc_title.lower() in d["title"].lower()] if not docs: return f"מסמך '{doc_title}' לא נמצא בתיק." results = [] for doc in docs: text = await db.get_document_text(UUID(doc["id"])) results.append({ "title": doc["title"], "doc_type": doc["doc_type"], "text": text[:10000] if text else "(ללא טקסט)", }) return json.dumps(results, ensure_ascii=False, indent=2) async def document_list(case_number: str) -> str: """רשימת מסמכים בתיק. Args: case_number: מספר תיק הערר """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." docs = await db.list_documents(UUID(case["id"])) if not docs: return f"אין מסמכים בתיק {case_number}." return json.dumps(docs, default=str, ensure_ascii=False, indent=2) async def extract_references( case_number: str, doc_title: str = "", ) -> str: """זיהוי תכניות, פסיקה וחקיקה מתוך מסמכי תיק. Args: case_number: מספר תיק הערר doc_title: שם מסמך ספציפי (אם ריק, מזהה בכל המסמכים) """ from legal_mcp.services import references_extractor case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) docs = await db.list_documents(case_id) if not docs: return f"אין מסמכים בתיק {case_number}." if doc_title: docs = [d for d in docs if doc_title.lower() in d["title"].lower()] results = [] for doc in docs: text = await db.get_document_text(UUID(doc["id"])) if not text: continue refs = await references_extractor.extract_and_link_references( UUID(doc["id"]), case_id, text, ) results.append({ "document": doc["title"], "plans": refs["plans"], "case_law": refs["case_law"], "case_law_linked": refs["case_law_linked"], "legislation": refs["legislation"], }) return json.dumps(results, default=str, ensure_ascii=False, indent=2) async def extract_claims( case_number: str, doc_title: str = "", party_hint: str = "", ) -> str: """חילוץ טענות מכתב טענות בתיק ושמירה ב-DB. Args: case_number: מספר תיק הערר doc_title: שם מסמך ספציפי (אם ריק, מחלץ מכל כתבי הטענות) party_hint: שם הצד המגיש (אם ידוע) """ from legal_mcp.services import claims_extractor case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) docs = await db.list_documents(case_id) if not docs: return f"אין מסמכים בתיק {case_number}." # Filter to claims documents (appeal, response) or specific doc if doc_title: docs = [d for d in docs if doc_title.lower() in d["title"].lower()] else: docs = [d for d in docs if d["doc_type"] in ("appeal", "response", "objection")] if not docs: return "לא נמצאו כתבי טענות בתיק." results = [] for doc in docs: text = await db.get_document_text(UUID(doc["id"])) if not text: continue result = await claims_extractor.extract_and_store_claims( case_id=case_id, document_id=UUID(doc["id"]), text=text, doc_type=doc["doc_type"], party_hint=party_hint, ) results.append(result) return json.dumps(results, default=str, ensure_ascii=False, indent=2) async def get_claims(case_number: str, party_role: str = "") -> str: """שליפת טענות שחולצו לתיק. Args: case_number: מספר תיק הערר party_role: סינון לפי צד (appellant/respondent/committee/permit_applicant). ריק = הכל. """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." claims = await db.get_claims( UUID(case["id"]), party_role=party_role if party_role else None, ) if not claims: return f"אין טענות בתיק {case_number}." # Format for display role_hebrew = { "appellant": "עוררים", "respondent": "משיבים", "committee": "ועדה מקומית", "permit_applicant": "מבקשי היתר", "appraiser": "שמאי", } formatted = [] for c in claims: formatted.append({ "party": role_hebrew.get(c["party_role"], c["party_role"]), "claim": c["claim_text"], "source": c.get("source_document", ""), }) return json.dumps(formatted, default=str, ensure_ascii=False, indent=2) # Whitelist of doc_type values; mirrors web/app.py:DOC_TYPE_NAMES. ALLOWED_DOC_TYPES = { "appeal", "response", "protocol", "plan", "decision", "court_decision", "permit", "appraisal", "exhibit", "objection", "reference", } # Allowed appraiser_side values; '' (empty) clears the tag. ALLOWED_APPRAISER_SIDES = {"committee", "appellant", "deciding", ""} async def document_update( case_number: str, doc_id: str, doc_type: str = "", appraiser_side: str = "", ) -> str: """עדכון תיוג מסמך — doc_type ו/או appraiser_side. ריק = אין שינוי. הולידציה זהה ל-PATCH endpoint ב-web/app.py. appraiser_side נשמר ב- documents.metadata JSONB (מתפרסם משם ע"י extract_appraiser_facts). Args: case_number: מספר תיק הערר (לאישור שייכות) doc_id: UUID של המסמך doc_type: ערך חדש (appeal/response/protocol/plan/decision/court_decision/ permit/appraisal/exhibit/objection/reference). ריק = אין שינוי. appraiser_side: ערך חדש (committee/appellant/deciding). ריק = אין שינוי; העבר במפורש מחרוזת ריקה לא-default אם רוצים לנקות. """ case = await db.get_case_by_number(case_number) if not case: return json.dumps({"status": "error", "message": f"תיק {case_number} לא נמצא."}, ensure_ascii=False, indent=2) try: doc_uuid = UUID(doc_id) except ValueError: return json.dumps({"status": "error", "message": f"doc_id לא תקין: {doc_id}"}, ensure_ascii=False, indent=2) doc = await db.get_document(doc_uuid) if not doc: return json.dumps({"status": "error", "message": f"מסמך {doc_id} לא נמצא."}, ensure_ascii=False, indent=2) if doc.get("case_id") != case["id"]: return json.dumps({"status": "error", "message": f"מסמך {doc_id} לא שייך לתיק {case_number}."}, ensure_ascii=False, indent=2) updates: dict = {} if doc_type: if doc_type not in ALLOWED_DOC_TYPES: return json.dumps({ "status": "error", "message": f"doc_type לא תקין: {doc_type}", "allowed": sorted(ALLOWED_DOC_TYPES), }, ensure_ascii=False, indent=2) updates["doc_type"] = doc_type # appraiser_side is optional. The MCP tool can't distinguish "skip" from # "set to empty string", so we use the convention: only update if non-empty. # To clear, the operator must edit metadata directly (rare). if appraiser_side: if appraiser_side not in ALLOWED_APPRAISER_SIDES: return json.dumps({ "status": "error", "message": f"appraiser_side לא תקין: {appraiser_side}", "allowed": sorted(s for s in ALLOWED_APPRAISER_SIDES if s), }, ensure_ascii=False, indent=2) metadata = doc.get("metadata") or {} if isinstance(metadata, str): metadata = json.loads(metadata) metadata["appraiser_side"] = appraiser_side updates["metadata"] = metadata if not updates: return json.dumps({"status": "noop", "message": "אין שינוי לבצע."}, ensure_ascii=False, indent=2) await db.update_document(doc_uuid, **updates) fresh = await db.get_document(doc_uuid) return json.dumps({ "status": "completed", "doc_id": doc_id, "doc_type": fresh.get("doc_type"), "metadata": fresh.get("metadata"), }, default=str, ensure_ascii=False, indent=2)