Files
legal-ai/mcp-server/src/legal_mcp/tools/workflow.py
Chaim 79b9c37301 feat(mcp): FU-14 GAP-48 פרוסה 2 — envelope אחיד ל-11 משפחות-כלים
המשך מיגרציית INV-TOOL1 מעבר למשפחת-החיפוש (#71). הומרו ל-{status,data,message}:
precedent_library, citations, internal_decisions, missing_precedents,
training_enrichment, precedents, legal_arguments, cases, documents, workflow
(~55 כלים). בוטלו 5 עותקי _ok/_err משוכפלים (alias ל-tools/envelope.py — SSoT, G2).

עיקרון: envelope-status = הצלחת-הקריאה-לכלי; תוצאה-עסקית (idempotent_existing,
noop, completed...) נשמרת בתוך data. err רק לכשל אמיתי (not-found/invalid/exception).

תאימות-API: צרכני web/app.py של cases/workflow/precedents חוּוטו דרך
envelope_unwrap + בדיקת status=="error"→4xx — תשובת ה-HTTP זהה, web-ui לא מושפע.
(documents/legal_arguments/citations/... אינם נצרכים מ-app.py — agent-only.)

בדיקות: 182/182 עוברים (test_corpus_constraints עודכן לחוזה החדש).
נותר: משפחת drafting (מסלול הפקת-ההחלטה) בפרוסה נפרדת עם שער טסט-ייצוא.

Invariants: מקדם INV-TOOL1 + G2 (SSoT, ביטול כפילות). מתועד ב-X9 + gap-audit.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 17:41:39 +00:00

442 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for workflow: status, outcome, brainstorming, direction."""
from __future__ import annotations
import logging
from uuid import UUID
from legal_mcp.services import db
from legal_mcp.services.lessons import (
OUTCOME_LABELS_HE,
VALID_OUTCOMES,
canonical_outcome,
)
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
logger = logging.getLogger(__name__)
async def workflow_status(case_number: str) -> str:
"""סטטוס תהליך עבודה מלא לתיק - מסמכים, עיבוד, טיוטות.
Args:
case_number: מספר תיק הערר
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
# Count chunks per document
pool = await db.get_pool()
async with pool.acquire() as conn:
chunk_counts = await conn.fetch(
"SELECT document_id, COUNT(*) as count FROM document_chunks WHERE case_id = $1 GROUP BY document_id",
case_id,
)
chunk_map = {str(r["document_id"]): r["count"] for r in chunk_counts}
doc_status = []
for doc in docs:
doc_status.append({
"title": doc["title"],
"type": doc["doc_type"],
"extraction": doc["extraction_status"],
"chunks": chunk_map.get(doc["id"], 0),
"pages": doc.get("page_count"),
})
# Check draft status
from pathlib import Path
from legal_mcp import config
case_dir = config.find_case_dir(case_number)
draft_path = case_dir / "drafts" / "decision.md"
has_draft = draft_path.exists()
draft_size = draft_path.stat().st_size if has_draft else 0
status = {
"case_number": case["case_number"],
"title": case["title"],
"status": case["status"],
"documents": doc_status,
"total_documents": len(docs),
"total_chunks": sum(chunk_map.values()),
"has_draft": has_draft,
"draft_size_bytes": draft_size,
"next_steps": _suggest_next_steps(case, docs, has_draft),
}
return ok(status)
def _suggest_next_steps(case: dict, docs: list, has_draft: bool) -> list[str]:
"""Suggest next steps based on case state."""
steps = []
doc_types = {d["doc_type"] for d in docs}
if not docs:
steps.append("העלה מסמכים לתיק (כתב ערר, תשובת ועדה)")
else:
if "appeal" not in doc_types:
steps.append("העלה כתב ערר")
if "response" not in doc_types:
steps.append("העלה תשובת ועדה/משיבים")
pending = [d for d in docs if d["extraction_status"] == "pending"]
if pending:
steps.append(f"עיבוד {len(pending)} מסמכים ממתינים")
if docs and not has_draft:
steps.append("התחל ניסוח טיוטת החלטה (/draft-decision)")
elif has_draft and case["status"] in ("new", "in_progress"):
steps.append("סקור ועדכן את הטיוטה")
steps.append("עדכן סטטוס ל-drafted")
if case["status"] == "drafted":
steps.append("סקירה סופית ועדכון סטטוס ל-reviewed")
elif case["status"] == "reviewed":
steps.append("אישור סופי ועדכון סטטוס ל-final")
return steps
async def get_metrics(case_number: str = "") -> str:
"""מדדי הצלחה — KPIs לתיק ספציפי או דשבורד כולל.
Args:
case_number: מספר תיק (אם ריק — דשבורד כולל)
"""
from legal_mcp.services import metrics
if case_number:
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
result = await metrics.get_case_metrics(UUID(case["id"]))
else:
result = await metrics.get_dashboard()
return ok(result)
async def processing_status() -> str:
"""סטטוס כללי - מספר תיקים, מסמכים ממתינים לעיבוד."""
pool = await db.get_pool()
async with pool.acquire() as conn:
case_count = await conn.fetchval("SELECT COUNT(*) FROM cases")
doc_count = await conn.fetchval("SELECT COUNT(*) FROM documents")
pending_count = await conn.fetchval(
"SELECT COUNT(*) FROM documents WHERE extraction_status = 'pending'"
)
chunk_count = await conn.fetchval("SELECT COUNT(*) FROM document_chunks")
corpus_count = await conn.fetchval("SELECT COUNT(*) FROM style_corpus")
pattern_count = await conn.fetchval("SELECT COUNT(*) FROM style_patterns")
return ok({
"cases": case_count,
"documents": doc_count,
"pending_processing": pending_count,
"chunks": chunk_count,
"style_corpus_entries": corpus_count,
"style_patterns": pattern_count,
})
# ── Outcome & Brainstorming ───────────────────────────────────────
async def set_outcome(
case_number: str,
outcome: str,
reasoning: str = "",
) -> str:
"""הזנת תוצאה לתיק ערר. יוצר רשומת החלטה ומפעיל סיעור מוחות אם אין נימוק.
Args:
case_number: מספר תיק הערר
outcome: תוצאה — rejection (דחייה) / partial_acceptance (קבלה חלקית) /
full_acceptance (קבלה מלאה). ערכי-legacy (rejected/accepted/partial) ממופים אוטומטית.
reasoning: נימוק (אופציונלי). אם ריק — מפעיל סיעור מוחות.
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
# GAP-51: accept legacy vocabulary (rejected/accepted/partial), store canonical.
outcome = canonical_outcome(outcome)
if outcome not in VALID_OUTCOMES:
return err(f"תוצאה לא תקינה. אפשרויות: {', '.join(VALID_OUTCOMES)}")
case_id = UUID(case["id"])
# Create or update decision
existing = await db.get_decision_by_case(case_id)
if existing:
await db.update_decision(
UUID(existing["id"]),
outcome=outcome,
outcome_summary=reasoning[:200] if reasoning else "",
outcome_reasoning=reasoning,
)
decision = await db.get_decision(UUID(existing["id"]))
else:
decision = await db.create_decision(
case_id=case_id,
outcome=outcome,
outcome_summary=reasoning[:200] if reasoning else "",
outcome_reasoning=reasoning,
)
# Update case status
await db.update_case(case_id, status="in_progress", expected_outcome=outcome)
outcome_hebrew = OUTCOME_LABELS_HE.get(outcome, outcome)
result = {
"decision_id": decision["id"],
"outcome": outcome,
"outcome_hebrew": outcome_hebrew,
"reasoning": reasoning,
"has_reasoning": bool(reasoning),
}
if not reasoning:
result["message"] = "לא סופק נימוק. הפעל /brainstorm כדי לקבל כיוונים אפשריים."
result["next_step"] = "brainstorm"
else:
result["message"] = f"תוצאה נשמרה: {outcome_hebrew}. ניתן להתחיל כתיבת טיוטה."
result["next_step"] = "draft"
return ok(result)
async def brainstorm_directions(
case_number: str,
) -> str:
"""סיעור מוחות — הצגת טענות מרכזיות והצעת 2-3 כיוונים אפשריים לנימוק.
Args:
case_number: מספר תיק הערר
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
# Get existing decision for outcome
decision = await db.get_decision_by_case(case_id)
if not decision:
return err("לא הוזנה תוצאה לתיק. הפעל set_outcome קודם.")
outcome = decision.get("outcome", "")
reasoning = decision.get("outcome_reasoning", "")
directions = await brainstorm.generate_directions(case_id, outcome, reasoning)
# Save brainstorm results to decision
await db.update_decision(
UUID(decision["id"]),
direction_doc={"brainstorm": directions, "approved": False},
)
return ok(directions)
async def approve_direction(
case_number: str,
direction_index: int = 0,
additional_notes: str = "",
) -> str:
"""אישור כיוון — יוצר מסמך כיוון מאושר. לא ניתן להתחיל כתיבת דיון בלי כיוון מאושר.
Args:
case_number: מספר תיק הערר
direction_index: מספר הכיוון שנבחר (0 = ראשון)
additional_notes: הערות נוספות
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
decision = await db.get_decision_by_case(case_id)
if not decision:
return err("לא הוזנה תוצאה לתיק.")
direction_data = decision.get("direction_doc") or {}
brainstorm_result = direction_data.get("brainstorm", {})
if not brainstorm_result.get("directions"):
return err("לא בוצע סיעור מוחות. הפעל brainstorm_directions קודם.")
direction_doc = brainstorm.build_direction_doc(
outcome=decision.get("outcome", ""),
reasoning=decision.get("outcome_reasoning", ""),
directions_result=brainstorm_result,
selected_direction=direction_index,
additional_notes=additional_notes,
)
await db.update_decision(UUID(decision["id"]), direction_doc=direction_doc)
return ok({"direction": direction_doc},
message="כיוון אושר. ניתן להתחיל כתיבת טיוטה.")
async def ingest_final_version(
case_number: str,
file_path: str = "",
final_text: str = "",
) -> str:
"""קליטת גרסה סופית (שדפנה חתמה). משווה לטיוטה ומחלצת לקחים.
Args:
case_number: מספר תיק הערר
file_path: נתיב לקובץ הגרסה הסופית (PDF/DOCX)
final_text: טקסט ישיר (אם אין קובץ)
"""
from legal_mcp.services import learning_loop
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
# Extract text from file if provided
if file_path and not final_text:
from legal_mcp.services import extractor
final_text, _, _ = await extractor.extract_text(file_path)
if not final_text:
return err("לא סופק טקסט — יש לספק file_path או final_text.")
try:
result = await learning_loop.process_final_version(case_id, final_text)
except ValueError as e:
return err(str(e))
# Auto-ingest into internal committee decisions corpus (best-effort).
try:
from legal_mcp.services import internal_decisions as int_svc
await int_svc.ingest_internal_decision(
case_number=case_number,
case_name=case.get("title", ""),
decision_date=case.get("decision_date"),
chair_name=case.get("chair_name", ""),
district="ירושלים",
practice_area=case.get("practice_area", ""),
appeal_subtype=case.get("appeal_subtype", ""),
text=final_text,
)
result["internal_corpus_ingested"] = True
except Exception as e:
logger.warning("ingest_final_version: internal corpus ingestion failed (non-fatal): %s", e)
result["internal_corpus_ingested"] = False
return ok(result)
# ── Chair feedback tools ──────────────────────────────────────────
async def record_chair_feedback(
case_number: str,
feedback_text: str,
block_id: str = "block-yod",
category: str = "missing_content",
lesson_extracted: str = "",
) -> str:
"""תיעוד הערת יו"ר (דפנה) על טיוטת החלטה.
Args:
case_number: מספר תיק הערר
feedback_text: ההערה של דפנה (מה חסר, מה לא נכון, מה צריך לשנות)
block_id: הבלוק שההערה מתייחסת אליו (ברירת מחדל: block-yod)
category: קטגוריה — missing_content/wrong_tone/wrong_structure/factual_error/style/other
lesson_extracted: הלקח שהופק מההערה (אם ברור כבר)
"""
case = await db.get_case_by_number(case_number)
case_id = UUID(case["id"]) if case else None
valid_categories = [
"missing_content", "wrong_tone", "wrong_structure",
"factual_error", "style", "other",
]
if category not in valid_categories:
return err(f"קטגוריה לא חוקית. אפשרויות: {', '.join(valid_categories)}")
feedback_id = await db.record_chair_feedback(
case_id=case_id,
block_id=block_id,
feedback_text=feedback_text,
category=category,
lesson_extracted=lesson_extracted,
)
return ok({
"feedback_id": str(feedback_id),
"next_steps": [
"כדי להפיק לקח מההערה, הפעל: analyze_chair_feedback",
"כדי לסמן כמטופל: resolve_chair_feedback",
],
}, message=f"הערה נרשמה בהצלחה. קטגוריה: {category}.")
async def list_chair_feedback(
case_number: str = "",
category: str = "",
unresolved_only: bool = True,
limit: int = 100,
) -> str:
"""הצגת הערות יו"ר שתועדו, עם אפשרות סינון.
Args:
case_number: סינון לפי תיק (אם ריק — כל ההערות)
category: סינון לפי קטגוריה
unresolved_only: האם להציג רק הערות שלא טופלו (ברירת מחדל: כן)
limit: תקרת תוצאות (INV-TOOL5 / GAP-53)
"""
case_id = None
if case_number:
case = await db.get_case_by_number(case_number)
if case:
case_id = UUID(case["id"])
feedbacks = await db.list_chair_feedback(
case_id=case_id,
category=category or None,
unresolved_only=unresolved_only,
limit=limit,
)
if not feedbacks:
return empty("אין הערות שמתאימות לסינון.")
items = []
for fb in feedbacks:
items.append({
"id": str(fb["id"]),
"case_id": str(fb["case_id"]) if fb["case_id"] else None,
"block_id": fb["block_id"],
"category": fb["category"],
"feedback": fb["feedback_text"],
"lesson": fb["lesson_extracted"],
"resolved": fb["resolved"],
"date": fb["created_at"].isoformat() if fb.get("created_at") else None,
})
return ok({
"total": len(items),
"feedbacks": items,
})