diff --git a/mcp-server/src/legal_mcp/services/local_classifier.py b/mcp-server/src/legal_mcp/services/local_classifier.py new file mode 100644 index 0000000..09a10f8 --- /dev/null +++ b/mcp-server/src/legal_mcp/services/local_classifier.py @@ -0,0 +1,105 @@ +"""Local document classifier — rule-based, no API calls. + +Classifies legal documents by filename patterns and content keywords. +Falls back to Claude Code headless (`claude -p`) for ambiguous cases. +""" + +from __future__ import annotations + +import json +import logging +import re +import subprocess +from pathlib import Path + +logger = logging.getLogger(__name__) + +# ── Filename patterns (checked in order, first match wins) ──────── + +_FILENAME_RULES: list[tuple[str, str, float]] = [ + # (regex pattern on filename, doc_type, confidence) + (r"כתב.ערר|כתב-ערר", "appeal", 1.0), + (r"תשובה|תשובת|תגובת|השלמת.טיעון|בקשה.להשלמת", "response", 1.0), + (r"פרוטוקול", "protocol", 1.0), + (r"החלטת?.ביניים|החלטה.לתיקון", "decision", 0.95), + (r"הוראות.תכנית|תכנית", "plan", 1.0), + (r"היתר", "permit", 1.0), + (r"שומה|חוו.ת.דעת", "appraisal", 1.0), + (r"התנגדות", "objection", 1.0), + # Court decisions: case number patterns + (r"(?:עעם|עע.?מ|עתמ|עת.?מ|בג.?צ|בבנ|עא|ע.?א|רעא|רע.?א|עעמ|עתמ)", "court_decision", 1.0), + # ערר + number that's NOT part of our case files (i.e. precedent references) + (r"^ערר.?\d", "court_decision", 0.9), +] + +# ── Content patterns (first 500 chars) ─────────────────────────── + +_CONTENT_RULES: list[tuple[str, str, float]] = [ + (r"בפני\s+ועדת\s+הערר|לפנינו\s+ערר|ניתנה?\s+היום", "decision", 0.85), + (r"כתב\s+ערר|העורר.{0,20}מגיש", "appeal", 0.85), + (r"כתב\s+תשובה|המשיב.{0,20}משיב", "response", 0.85), + (r"פרוטוקול\s+(?:דיון|ישיבה|ועדה)", "protocol", 0.9), + (r"בית\s+(?:ה)?משפט|פסק\s+דין|השופט", "court_decision", 0.85), + (r"הוראות\s+(?:ה)?תכנית|תב.עה|ייעוד\s+הקרקע", "plan", 0.8), +] + + +def classify(filename: str, text: str = "") -> tuple[str, float]: + """Classify a legal document by filename and content. + + Returns (doc_type, confidence). Confidence > 0.8 means high certainty. + """ + name = Path(filename).stem + + # Try filename rules + for pattern, doc_type, confidence in _FILENAME_RULES: + if re.search(pattern, name): + logger.info("Local classifier: '%s' → %s (filename, %.2f)", name, doc_type, confidence) + return doc_type, confidence + + # Try content rules (first 500 chars) + snippet = text[:500] if text else "" + for pattern, doc_type, confidence in _CONTENT_RULES: + if re.search(pattern, snippet): + logger.info("Local classifier: '%s' → %s (content, %.2f)", name, doc_type, confidence) + return doc_type, confidence + + logger.info("Local classifier: '%s' → reference (no match, 0.3)", name) + return "reference", 0.3 + + +def classify_with_claude_code(filename: str, text: str) -> tuple[str, float]: + """Fallback: use Claude Code headless to classify ambiguous documents. + + Only works when `claude` CLI is available (not in Docker). + """ + prompt = ( + "סווג את המסמך המשפטי הבא לאחת הקטגוריות הבאות בלבד:\n" + "appeal, response, protocol, decision, plan, permit, appraisal, " + "court_decision, exhibit, objection, reference\n\n" + f"שם הקובץ: {filename}\n" + f"תחילת המסמך:\n{text[:500]}\n\n" + 'החזר JSON בלבד: {"doc_type": "...", "confidence": 0.9}' + ) + + try: + result = subprocess.run( + ["claude", "-p", prompt, "--output-format", "json", "--max-turns", "1"], + capture_output=True, text=True, timeout=60, + ) + if result.returncode == 0 and result.stdout.strip(): + data = json.loads(result.stdout) + # claude -p --output-format json wraps in {"result": "..."} + inner = data.get("result", data) + if isinstance(inner, str): + inner = json.loads(inner) + doc_type = inner.get("doc_type", "reference") + confidence = float(inner.get("confidence", 0.7)) + logger.info("Claude Code classifier: '%s' → %s (%.2f)", filename, doc_type, confidence) + return doc_type, confidence + except FileNotFoundError: + logger.debug("Claude CLI not available — skipping headless fallback") + except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception) as e: + logger.warning("Claude Code classifier failed: %s", e) + + return "reference", 0.3 diff --git a/mcp-server/src/legal_mcp/services/processor.py b/mcp-server/src/legal_mcp/services/processor.py index 5ea84bb..5a26357 100644 --- a/mcp-server/src/legal_mcp/services/processor.py +++ b/mcp-server/src/legal_mcp/services/processor.py @@ -37,39 +37,22 @@ async def process_document(document_id: UUID, case_id: UUID) -> dict: page_count=page_count, ) - # Step 1.5: Classify document and identify parties (non-fatal) + # Step 1.5: Classify document — local rules first, Claude Code headless fallback classification_result = {} try: - logger.info("Classifying document") - case_number = "" - if case_id: - case = await db.get_case(case_id) - if case: - case_number = case.get("case_number", "") - classification_result = await classifier.classify_and_identify(text, case_number) - await db.update_document( - document_id, - metadata=classification_result, - ) - logger.info( - "Classification: %s (confidence: %.2f), parties found: %d appellants, %d respondents", - classification_result["classification"].get("doc_type", "?"), - classification_result["classification"].get("confidence", 0), - len(classification_result["parties"].get("appellants", [])), - len(classification_result["parties"].get("respondents", [])), - ) + from legal_mcp.services import local_classifier + filename = Path(doc["file_path"]).name + doc_type, confidence = local_classifier.classify(filename, text) + if confidence < 0.8: + doc_type, confidence = local_classifier.classify_with_claude_code(filename, text) - # Update case parties if empty - if case_id and case: - parties = classification_result.get("parties", {}) - updates = {} - if not case.get("appellants") and parties.get("appellants"): - updates["appellants"] = parties["appellants"] - if not case.get("respondents") and parties.get("respondents"): - updates["respondents"] = parties["respondents"] - if updates: - await db.update_case(case_id, **updates) - logger.info("Updated case parties: %s", updates) + # Update doc_type if we got a good classification and current type is generic + if confidence >= 0.5 and doc.get("doc_type") in ("reference", "auto"): + await db.update_document(document_id, doc_type=doc_type) + logger.info("Auto-classified: %s → %s (confidence %.2f)", filename, doc_type, confidence) + + classification_result = {"classification": {"doc_type": doc_type, "confidence": confidence}} + await db.update_document(document_id, metadata=classification_result) except Exception as e: logger.warning("Classification failed (non-fatal): %s", e)