"""MCP tools for workflow: status, outcome, brainstorming, direction.""" from __future__ import annotations import json import logging from uuid import UUID from legal_mcp.services import db logger = logging.getLogger(__name__) async def workflow_status(case_number: str) -> str: """סטטוס תהליך עבודה מלא לתיק - מסמכים, עיבוד, טיוטות. Args: case_number: מספר תיק הערר """ case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) docs = await db.list_documents(case_id) # Count chunks per document pool = await db.get_pool() async with pool.acquire() as conn: chunk_counts = await conn.fetch( "SELECT document_id, COUNT(*) as count FROM document_chunks WHERE case_id = $1 GROUP BY document_id", case_id, ) chunk_map = {str(r["document_id"]): r["count"] for r in chunk_counts} doc_status = [] for doc in docs: doc_status.append({ "title": doc["title"], "type": doc["doc_type"], "extraction": doc["extraction_status"], "chunks": chunk_map.get(doc["id"], 0), "pages": doc.get("page_count"), }) # Check draft status from pathlib import Path from legal_mcp import config case_dir = config.find_case_dir(case_number) draft_path = case_dir / "drafts" / "decision.md" has_draft = draft_path.exists() draft_size = draft_path.stat().st_size if has_draft else 0 status = { "case_number": case["case_number"], "title": case["title"], "status": case["status"], "documents": doc_status, "total_documents": len(docs), "total_chunks": sum(chunk_map.values()), "has_draft": has_draft, "draft_size_bytes": draft_size, "next_steps": _suggest_next_steps(case, docs, has_draft), } return json.dumps(status, ensure_ascii=False, indent=2) def _suggest_next_steps(case: dict, docs: list, has_draft: bool) -> list[str]: """Suggest next steps based on case state.""" steps = [] doc_types = {d["doc_type"] for d in docs} if not docs: steps.append("העלה מסמכים לתיק (כתב ערר, תשובת ועדה)") else: if "appeal" not in doc_types: steps.append("העלה כתב ערר") if "response" not in doc_types: steps.append("העלה תשובת ועדה/משיבים") pending = [d for d in docs if d["extraction_status"] == "pending"] if pending: steps.append(f"עיבוד {len(pending)} מסמכים ממתינים") if docs and not has_draft: steps.append("התחל ניסוח טיוטת החלטה (/draft-decision)") elif has_draft and case["status"] in ("new", "in_progress"): steps.append("סקור ועדכן את הטיוטה") steps.append("עדכן סטטוס ל-drafted") if case["status"] == "drafted": steps.append("סקירה סופית ועדכון סטטוס ל-reviewed") elif case["status"] == "reviewed": steps.append("אישור סופי ועדכון סטטוס ל-final") return steps async def get_metrics(case_number: str = "") -> str: """מדדי הצלחה — KPIs לתיק ספציפי או דשבורד כולל. Args: case_number: מספר תיק (אם ריק — דשבורד כולל) """ from legal_mcp.services import metrics if case_number: case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." result = await metrics.get_case_metrics(UUID(case["id"])) else: result = await metrics.get_dashboard() return json.dumps(result, default=str, ensure_ascii=False, indent=2) async def processing_status() -> str: """סטטוס כללי - מספר תיקים, מסמכים ממתינים לעיבוד.""" pool = await db.get_pool() async with pool.acquire() as conn: case_count = await conn.fetchval("SELECT COUNT(*) FROM cases") doc_count = await conn.fetchval("SELECT COUNT(*) FROM documents") pending_count = await conn.fetchval( "SELECT COUNT(*) FROM documents WHERE extraction_status = 'pending'" ) chunk_count = await conn.fetchval("SELECT COUNT(*) FROM document_chunks") corpus_count = await conn.fetchval("SELECT COUNT(*) FROM style_corpus") pattern_count = await conn.fetchval("SELECT COUNT(*) FROM style_patterns") return json.dumps({ "cases": case_count, "documents": doc_count, "pending_processing": pending_count, "chunks": chunk_count, "style_corpus_entries": corpus_count, "style_patterns": pattern_count, }, ensure_ascii=False, indent=2) # ── Outcome & Brainstorming ─────────────────────────────────────── async def set_outcome( case_number: str, outcome: str, reasoning: str = "", ) -> str: """הזנת תוצאה לתיק ערר. יוצר רשומת החלטה ומפעיל סיעור מוחות אם אין נימוק. Args: case_number: מספר תיק הערר outcome: תוצאה — rejected (דחייה), accepted (קבלה), partial (קבלה חלקית) reasoning: נימוק (אופציונלי). אם ריק — מפעיל סיעור מוחות. """ from legal_mcp.services import brainstorm case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." valid_outcomes = ("rejected", "accepted", "partial") if outcome not in valid_outcomes: return f"תוצאה לא תקינה. אפשרויות: {', '.join(valid_outcomes)}" case_id = UUID(case["id"]) # Create or update decision existing = await db.get_decision_by_case(case_id) if existing: await db.update_decision( UUID(existing["id"]), outcome=outcome, outcome_summary=reasoning[:200] if reasoning else "", outcome_reasoning=reasoning, ) decision = await db.get_decision(UUID(existing["id"])) else: decision = await db.create_decision( case_id=case_id, outcome=outcome, outcome_summary=reasoning[:200] if reasoning else "", outcome_reasoning=reasoning, ) # Update case status await db.update_case(case_id, status="in_progress", expected_outcome=outcome) outcome_hebrew = {"rejected": "דחייה", "accepted": "קבלה", "partial": "קבלה חלקית"}.get(outcome, outcome) result = { "decision_id": decision["id"], "outcome": outcome, "outcome_hebrew": outcome_hebrew, "reasoning": reasoning, "has_reasoning": bool(reasoning), } if not reasoning: result["message"] = "לא סופק נימוק. הפעל /brainstorm כדי לקבל כיוונים אפשריים." result["next_step"] = "brainstorm" else: result["message"] = f"תוצאה נשמרה: {outcome_hebrew}. ניתן להתחיל כתיבת טיוטה." result["next_step"] = "draft" return json.dumps(result, default=str, ensure_ascii=False, indent=2) async def brainstorm_directions( case_number: str, ) -> str: """סיעור מוחות — הצגת טענות מרכזיות והצעת 2-3 כיוונים אפשריים לנימוק. Args: case_number: מספר תיק הערר """ from legal_mcp.services import brainstorm case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) # Get existing decision for outcome decision = await db.get_decision_by_case(case_id) if not decision: return "לא הוזנה תוצאה לתיק. הפעל set_outcome קודם." outcome = decision.get("outcome", "") reasoning = decision.get("outcome_reasoning", "") directions = await brainstorm.generate_directions(case_id, outcome, reasoning) # Save brainstorm results to decision await db.update_decision( UUID(decision["id"]), direction_doc={"brainstorm": directions, "approved": False}, ) return json.dumps(directions, default=str, ensure_ascii=False, indent=2) async def approve_direction( case_number: str, direction_index: int = 0, additional_notes: str = "", ) -> str: """אישור כיוון — יוצר מסמך כיוון מאושר. לא ניתן להתחיל כתיבת דיון בלי כיוון מאושר. Args: case_number: מספר תיק הערר direction_index: מספר הכיוון שנבחר (0 = ראשון) additional_notes: הערות נוספות """ from legal_mcp.services import brainstorm case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) decision = await db.get_decision_by_case(case_id) if not decision: return "לא הוזנה תוצאה לתיק." direction_data = decision.get("direction_doc") or {} brainstorm_result = direction_data.get("brainstorm", {}) if not brainstorm_result.get("directions"): return "לא בוצע סיעור מוחות. הפעל brainstorm_directions קודם." direction_doc = brainstorm.build_direction_doc( outcome=decision.get("outcome", ""), reasoning=decision.get("outcome_reasoning", ""), directions_result=brainstorm_result, selected_direction=direction_index, additional_notes=additional_notes, ) await db.update_decision(UUID(decision["id"]), direction_doc=direction_doc) return json.dumps({ "status": "approved", "message": "כיוון אושר. ניתן להתחיל כתיבת טיוטה.", "direction": direction_doc, }, default=str, ensure_ascii=False, indent=2) async def ingest_final_version( case_number: str, file_path: str = "", final_text: str = "", ) -> str: """קליטת גרסה סופית (שדפנה חתמה). משווה לטיוטה ומחלצת לקחים. Args: case_number: מספר תיק הערר file_path: נתיב לקובץ הגרסה הסופית (PDF/DOCX) final_text: טקסט ישיר (אם אין קובץ) """ from legal_mcp.services import learning_loop case = await db.get_case_by_number(case_number) if not case: return f"תיק {case_number} לא נמצא." case_id = UUID(case["id"]) # Extract text from file if provided if file_path and not final_text: from legal_mcp.services import extractor final_text, _, _ = await extractor.extract_text(file_path) if not final_text: return "לא סופק טקסט — יש לספק file_path או final_text." try: result = await learning_loop.process_final_version(case_id, final_text) except ValueError as e: return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False, indent=2) # Auto-ingest into internal committee decisions corpus (best-effort). try: from legal_mcp.services import internal_decisions as int_svc await int_svc.ingest_internal_decision( case_number=case_number, case_name=case.get("title", ""), decision_date=case.get("decision_date"), chair_name=case.get("chair_name", ""), district="ירושלים", practice_area=case.get("practice_area", ""), appeal_subtype=case.get("appeal_subtype", ""), text=final_text, ) result["internal_corpus_ingested"] = True except Exception as e: logger.warning("ingest_final_version: internal corpus ingestion failed (non-fatal): %s", e) result["internal_corpus_ingested"] = False return json.dumps(result, default=str, ensure_ascii=False, indent=2) # ── Chair feedback tools ────────────────────────────────────────── async def record_chair_feedback( case_number: str, feedback_text: str, block_id: str = "block-yod", category: str = "missing_content", lesson_extracted: str = "", ) -> str: """תיעוד הערת יו"ר (דפנה) על טיוטת החלטה. Args: case_number: מספר תיק הערר feedback_text: ההערה של דפנה (מה חסר, מה לא נכון, מה צריך לשנות) block_id: הבלוק שההערה מתייחסת אליו (ברירת מחדל: block-yod) category: קטגוריה — missing_content/wrong_tone/wrong_structure/factual_error/style/other lesson_extracted: הלקח שהופק מההערה (אם ברור כבר) """ case = await db.get_case_by_number(case_number) case_id = UUID(case["id"]) if case else None valid_categories = [ "missing_content", "wrong_tone", "wrong_structure", "factual_error", "style", "other", ] if category not in valid_categories: return f"קטגוריה לא חוקית. אפשרויות: {', '.join(valid_categories)}" feedback_id = await db.record_chair_feedback( case_id=case_id, block_id=block_id, feedback_text=feedback_text, category=category, lesson_extracted=lesson_extracted, ) return json.dumps({ "status": "ok", "feedback_id": str(feedback_id), "message": f"הערה נרשמה בהצלחה. קטגוריה: {category}.", "next_steps": [ "כדי להפיק לקח מההערה, הפעל: analyze_chair_feedback", "כדי לסמן כמטופל: resolve_chair_feedback", ], }, ensure_ascii=False, indent=2) async def list_chair_feedback( case_number: str = "", category: str = "", unresolved_only: bool = True, limit: int = 100, ) -> str: """הצגת הערות יו"ר שתועדו, עם אפשרות סינון. Args: case_number: סינון לפי תיק (אם ריק — כל ההערות) category: סינון לפי קטגוריה unresolved_only: האם להציג רק הערות שלא טופלו (ברירת מחדל: כן) limit: תקרת תוצאות (INV-TOOL5 / GAP-53) """ case_id = None if case_number: case = await db.get_case_by_number(case_number) if case: case_id = UUID(case["id"]) feedbacks = await db.list_chair_feedback( case_id=case_id, category=category or None, unresolved_only=unresolved_only, limit=limit, ) if not feedbacks: return "אין הערות שמתאימות לסינון." items = [] for fb in feedbacks: items.append({ "id": str(fb["id"]), "case_id": str(fb["case_id"]) if fb["case_id"] else None, "block_id": fb["block_id"], "category": fb["category"], "feedback": fb["feedback_text"], "lesson": fb["lesson_extracted"], "resolved": fb["resolved"], "date": fb["created_at"].isoformat() if fb.get("created_at") else None, }) return json.dumps({ "total": len(items), "feedbacks": items, }, ensure_ascii=False, indent=2, default=str)