Files
legal-ai/mcp-server/src/legal_mcp/tools/workflow.py
Chaim 242e6cfd11
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
fix(learning): chair_name במקור — סופי-ועדה תמיד נכנס לקורפוס-הפסיקה (TaskMaster #134)
הבאג: שלב-הלמידה (ingest_final_version → ingest_internal_decision) מוסיף כל
סופי כתקדים ציטוטי ב-case_law (source_kind=internal_committee), אך נכשל
בשקט (non-fatal warning) כש-cases.chair_name ריק — בגלל constraint
case_law_internal_chair_check. כך סופיים של 1194/1200/8070 לא נכנסו
לקורפוס-הפסיקה. שורש: (1) chair_name לא נקבע בפתיחת תיק; (2) מסלול-ה-MCP
העביר chair גולמי בעוד מסלול-ה-UI (web/) כבר פתר אותו דטרמיניסטית —
**מסלולים מקבילים מתפצלים (הפרת INV-G2)**; (3) הכשל נבלע (נגד §6).

תיקון-שורש (3 שכבות):
1. **SoT יחיד (INV-G2):** `config.committee_chair_for_case` — המקום היחיד
   שגם web/app.py וגם tools/workflow.py + db.create_case גוזרים ממנו chair
   (לפי תחילית מספר-התיק; override ל-env). web/ אחוד אליו (הוסרה הכפילות).
2. **נרמול-במקור (INV-G1):** `db.create_case` קובע chair_name תמיד לא-ריק;
   `cases.case_create` חושף param. `ingest_final_version` גוזר chair מה-SoT
   במקום הערך הגולמי → ה-constraint לא נופל.
3. **נראות (§6/feedback_silent_swallow):** כשל-העתק מוחזר ב-result
   (`internal_corpus_error`) ו-`final_learning_pipeline` מדפיס אזהרה — לא
   נבלע. backfill ל-11 תיקים עם chair ריק. `audit_corpus_integrity`:
   נוספו CHECK_D (תיקים מוכרעים ללא chair) + CHECK_E (סופי-final חסר
   מקורפוס-הפסיקה) — שניהם 0 כעת.

invariants: מקיים INV-G1 (נרמול בכתיבה), INV-G2 (מסלול-יחיד, אוחד web↔MCP),
§6 (אין בליעה שקטה). בדיקות: py_compile + 14 pytest (chair_seed_gate,
audit_provenance) + integration של create_case (default+override) + הרצת
ה-audit החי (A–E=0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 07:25:54 +00:00

451 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for workflow: status, outcome, brainstorming, direction."""
from __future__ import annotations
import logging
from uuid import UUID
from legal_mcp.services import db
from legal_mcp.services.lessons import (
OUTCOME_LABELS_HE,
VALID_OUTCOMES,
canonical_outcome,
)
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
logger = logging.getLogger(__name__)
async def workflow_status(case_number: str) -> str:
"""סטטוס תהליך עבודה מלא לתיק - מסמכים, עיבוד, טיוטות.
Args:
case_number: מספר תיק הערר
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
# Count chunks per document
pool = await db.get_pool()
async with pool.acquire() as conn:
chunk_counts = await conn.fetch(
"SELECT document_id, COUNT(*) as count FROM document_chunks WHERE case_id = $1 GROUP BY document_id",
case_id,
)
chunk_map = {str(r["document_id"]): r["count"] for r in chunk_counts}
doc_status = []
for doc in docs:
doc_status.append({
"title": doc["title"],
"type": doc["doc_type"],
"extraction": doc["extraction_status"],
"chunks": chunk_map.get(doc["id"], 0),
"pages": doc.get("page_count"),
})
# Check draft status
from pathlib import Path
from legal_mcp import config
case_dir = config.find_case_dir(case_number)
draft_path = case_dir / "drafts" / "decision.md"
has_draft = draft_path.exists()
draft_size = draft_path.stat().st_size if has_draft else 0
status = {
"case_number": case["case_number"],
"title": case["title"],
"status": case["status"],
"documents": doc_status,
"total_documents": len(docs),
"total_chunks": sum(chunk_map.values()),
"has_draft": has_draft,
"draft_size_bytes": draft_size,
"next_steps": _suggest_next_steps(case, docs, has_draft),
}
return ok(status)
def _suggest_next_steps(case: dict, docs: list, has_draft: bool) -> list[str]:
"""Suggest next steps based on case state."""
steps = []
doc_types = {d["doc_type"] for d in docs}
if not docs:
steps.append("העלה מסמכים לתיק (כתב ערר, תשובת ועדה)")
else:
if "appeal" not in doc_types:
steps.append("העלה כתב ערר")
if "response" not in doc_types:
steps.append("העלה תשובת ועדה/משיבים")
pending = [d for d in docs if d["extraction_status"] == "pending"]
if pending:
steps.append(f"עיבוד {len(pending)} מסמכים ממתינים")
if docs and not has_draft:
steps.append("התחל ניסוח טיוטת החלטה (/draft-decision)")
elif has_draft and case["status"] in ("new", "in_progress"):
steps.append("סקור ועדכן את הטיוטה")
steps.append("עדכן סטטוס ל-drafted")
if case["status"] == "drafted":
steps.append("סקירה סופית ועדכון סטטוס ל-reviewed")
elif case["status"] == "reviewed":
steps.append("אישור סופי ועדכון סטטוס ל-final")
return steps
async def get_metrics(case_number: str = "") -> str:
"""מדדי הצלחה — KPIs לתיק ספציפי או דשבורד כולל.
Args:
case_number: מספר תיק (אם ריק — דשבורד כולל)
"""
from legal_mcp.services import metrics
if case_number:
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
result = await metrics.get_case_metrics(UUID(case["id"]))
else:
result = await metrics.get_dashboard()
return ok(result)
async def processing_status() -> str:
"""סטטוס כללי - מספר תיקים, מסמכים ממתינים לעיבוד."""
pool = await db.get_pool()
async with pool.acquire() as conn:
case_count = await conn.fetchval("SELECT COUNT(*) FROM cases")
doc_count = await conn.fetchval("SELECT COUNT(*) FROM documents")
pending_count = await conn.fetchval(
"SELECT COUNT(*) FROM documents WHERE extraction_status = 'pending'"
)
chunk_count = await conn.fetchval("SELECT COUNT(*) FROM document_chunks")
corpus_count = await conn.fetchval("SELECT COUNT(*) FROM style_corpus")
pattern_count = await conn.fetchval("SELECT COUNT(*) FROM style_patterns")
return ok({
"cases": case_count,
"documents": doc_count,
"pending_processing": pending_count,
"chunks": chunk_count,
"style_corpus_entries": corpus_count,
"style_patterns": pattern_count,
})
# ── Outcome & Brainstorming ───────────────────────────────────────
async def set_outcome(
case_number: str,
outcome: str,
reasoning: str = "",
) -> str:
"""הזנת תוצאה לתיק ערר. יוצר רשומת החלטה ומפעיל סיעור מוחות אם אין נימוק.
Args:
case_number: מספר תיק הערר
outcome: תוצאה — rejection (דחייה) / partial_acceptance (קבלה חלקית) /
full_acceptance (קבלה מלאה). ערכי-legacy (rejected/accepted/partial) ממופים אוטומטית.
reasoning: נימוק (אופציונלי). אם ריק — מפעיל סיעור מוחות.
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
# GAP-51: accept legacy vocabulary (rejected/accepted/partial), store canonical.
outcome = canonical_outcome(outcome)
if outcome not in VALID_OUTCOMES:
return err(f"תוצאה לא תקינה. אפשרויות: {', '.join(VALID_OUTCOMES)}")
case_id = UUID(case["id"])
# Create or update decision
existing = await db.get_decision_by_case(case_id)
if existing:
await db.update_decision(
UUID(existing["id"]),
outcome=outcome,
outcome_summary=reasoning[:200] if reasoning else "",
outcome_reasoning=reasoning,
)
decision = await db.get_decision(UUID(existing["id"]))
else:
decision = await db.create_decision(
case_id=case_id,
outcome=outcome,
outcome_summary=reasoning[:200] if reasoning else "",
outcome_reasoning=reasoning,
)
# Update case status
await db.update_case(case_id, status="in_progress", expected_outcome=outcome)
outcome_hebrew = OUTCOME_LABELS_HE.get(outcome, outcome)
result = {
"decision_id": decision["id"],
"outcome": outcome,
"outcome_hebrew": outcome_hebrew,
"reasoning": reasoning,
"has_reasoning": bool(reasoning),
}
if not reasoning:
result["message"] = "לא סופק נימוק. הפעל /brainstorm כדי לקבל כיוונים אפשריים."
result["next_step"] = "brainstorm"
else:
result["message"] = f"תוצאה נשמרה: {outcome_hebrew}. ניתן להתחיל כתיבת טיוטה."
result["next_step"] = "draft"
return ok(result)
async def brainstorm_directions(
case_number: str,
) -> str:
"""סיעור מוחות — הצגת טענות מרכזיות והצעת 2-3 כיוונים אפשריים לנימוק.
Args:
case_number: מספר תיק הערר
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
# Get existing decision for outcome
decision = await db.get_decision_by_case(case_id)
if not decision:
return err("לא הוזנה תוצאה לתיק. הפעל set_outcome קודם.")
outcome = decision.get("outcome", "")
reasoning = decision.get("outcome_reasoning", "")
directions = await brainstorm.generate_directions(case_id, outcome, reasoning)
# Save brainstorm results to decision
await db.update_decision(
UUID(decision["id"]),
direction_doc={"brainstorm": directions, "approved": False},
)
return ok(directions)
async def approve_direction(
case_number: str,
direction_index: int = 0,
additional_notes: str = "",
) -> str:
"""אישור כיוון — יוצר מסמך כיוון מאושר. לא ניתן להתחיל כתיבת דיון בלי כיוון מאושר.
Args:
case_number: מספר תיק הערר
direction_index: מספר הכיוון שנבחר (0 = ראשון)
additional_notes: הערות נוספות
"""
from legal_mcp.services import brainstorm
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
decision = await db.get_decision_by_case(case_id)
if not decision:
return err("לא הוזנה תוצאה לתיק.")
direction_data = decision.get("direction_doc") or {}
brainstorm_result = direction_data.get("brainstorm", {})
if not brainstorm_result.get("directions"):
return err("לא בוצע סיעור מוחות. הפעל brainstorm_directions קודם.")
direction_doc = brainstorm.build_direction_doc(
outcome=decision.get("outcome", ""),
reasoning=decision.get("outcome_reasoning", ""),
directions_result=brainstorm_result,
selected_direction=direction_index,
additional_notes=additional_notes,
)
await db.update_decision(UUID(decision["id"]), direction_doc=direction_doc)
return ok({"direction": direction_doc},
message="כיוון אושר. ניתן להתחיל כתיבת טיוטה.")
async def ingest_final_version(
case_number: str,
file_path: str = "",
final_text: str = "",
) -> str:
"""קליטת גרסה סופית (שדפנה חתמה). משווה לטיוטה ומחלצת לקחים.
Args:
case_number: מספר תיק הערר
file_path: נתיב לקובץ הגרסה הסופית (PDF/DOCX)
final_text: טקסט ישיר (אם אין קובץ)
"""
from legal_mcp.services import learning_loop
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
# Extract text from file if provided
if file_path and not final_text:
from legal_mcp.services import extractor
final_text, _, _ = await extractor.extract_text(file_path)
if not final_text:
return err("לא סופק טקסט — יש לספק file_path או final_text.")
try:
result = await learning_loop.process_final_version(case_id, final_text)
except ValueError as e:
return err(str(e))
# Auto-ingest into internal committee decisions corpus (best-effort).
# chair_name is resolved via the shared SoT (config.committee_chair_for_case)
# — the SAME resolver the FastAPI upload path uses — so the two paths cannot
# drift (INV-G2) and the DB chair constraint is never hit on an empty chair
# (INV-G1: chair normalised at source). Failures are surfaced, not swallowed
# (engineering rule §6 / feedback_silent_swallow): the result carries the
# reason and final_learning_pipeline prints it.
try:
from legal_mcp import config
from legal_mcp.services import internal_decisions as int_svc
await int_svc.ingest_internal_decision(
case_number=case_number,
case_name=case.get("title", ""),
decision_date=case.get("decision_date"),
chair_name=config.committee_chair_for_case(case, case_number),
district="ירושלים",
practice_area=case.get("practice_area", ""),
appeal_subtype=case.get("appeal_subtype", ""),
text=final_text,
)
result["internal_corpus_ingested"] = True
except Exception as e:
logger.warning(
"ingest_final_version: internal corpus ingestion failed (non-fatal): %s", e)
result["internal_corpus_ingested"] = False
result["internal_corpus_error"] = str(e)
return ok(result)
# ── Chair feedback tools ──────────────────────────────────────────
async def record_chair_feedback(
case_number: str,
feedback_text: str,
block_id: str = "block-yod",
category: str = "missing_content",
lesson_extracted: str = "",
) -> str:
"""תיעוד הערת יו"ר (דפנה) על טיוטת החלטה.
Args:
case_number: מספר תיק הערר
feedback_text: ההערה של דפנה (מה חסר, מה לא נכון, מה צריך לשנות)
block_id: הבלוק שההערה מתייחסת אליו (ברירת מחדל: block-yod)
category: קטגוריה — missing_content/wrong_tone/wrong_structure/factual_error/style/other
lesson_extracted: הלקח שהופק מההערה (אם ברור כבר)
"""
case = await db.get_case_by_number(case_number)
case_id = UUID(case["id"]) if case else None
valid_categories = [
"missing_content", "wrong_tone", "wrong_structure",
"factual_error", "style", "other",
]
if category not in valid_categories:
return err(f"קטגוריה לא חוקית. אפשרויות: {', '.join(valid_categories)}")
feedback_id = await db.record_chair_feedback(
case_id=case_id,
block_id=block_id,
feedback_text=feedback_text,
category=category,
lesson_extracted=lesson_extracted,
)
return ok({
"feedback_id": str(feedback_id),
"next_steps": [
"כדי להפיק לקח מההערה, הפעל: analyze_chair_feedback",
"כדי לסמן כמטופל: resolve_chair_feedback",
],
}, message=f"הערה נרשמה בהצלחה. קטגוריה: {category}.")
async def list_chair_feedback(
case_number: str = "",
category: str = "",
unresolved_only: bool = True,
limit: int = 100,
) -> str:
"""הצגת הערות יו"ר שתועדו, עם אפשרות סינון.
Args:
case_number: סינון לפי תיק (אם ריק — כל ההערות)
category: סינון לפי קטגוריה
unresolved_only: האם להציג רק הערות שלא טופלו (ברירת מחדל: כן)
limit: תקרת תוצאות (INV-TOOL5 / GAP-53)
"""
case_id = None
if case_number:
case = await db.get_case_by_number(case_number)
if case:
case_id = UUID(case["id"])
feedbacks = await db.list_chair_feedback(
case_id=case_id,
category=category or None,
unresolved_only=unresolved_only,
limit=limit,
)
if not feedbacks:
return empty("אין הערות שמתאימות לסינון.")
items = []
for fb in feedbacks:
items.append({
"id": str(fb["id"]),
"case_id": str(fb["case_id"]) if fb["case_id"] else None,
"block_id": fb["block_id"],
"category": fb["category"],
"feedback": fb["feedback_text"],
"lesson": fb["lesson_extracted"],
"resolved": fb["resolved"],
"date": fb["created_at"].isoformat() if fb.get("created_at") else None,
})
return ok({
"total": len(items),
"feedbacks": items,
})