diff --git a/mcp-server/src/legal_mcp/services/qa_validator.py b/mcp-server/src/legal_mcp/services/qa_validator.py index e9cc0b7..10afe4a 100644 --- a/mcp-server/src/legal_mcp/services/qa_validator.py +++ b/mcp-server/src/legal_mcp/services/qa_validator.py @@ -18,6 +18,9 @@ import logging import re from uuid import UUID +import anthropic + +from legal_mcp import config from legal_mcp.services import db logger = logging.getLogger(__name__) @@ -85,13 +88,33 @@ def check_neutral_background(blocks: list[dict]) -> dict: } -def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict: - """בדיקה שכל טענה מכתבי הטענות המקוריים נענתה בבלוק י. +_anthropic_client: anthropic.Anthropic | None = None - Uses keyword extraction: for each claim, extracts significant words - (nouns/verbs >3 chars) and checks if enough appear in the discussion. - Filters out block-zayin claims (those are from the final decision, not source). - """ + +def _get_anthropic() -> anthropic.Anthropic: + global _anthropic_client + if _anthropic_client is None: + _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) + return _anthropic_client + + +CLAIMS_CHECK_PROMPT = """אתה בודק איכות החלטות משפטיות. קיבלת רשימת טענות שהועלו בכתבי הטענות, ואת בלוק הדיון של ההחלטה. + +## משימה: +לכל טענה ממוספרת, קבע אם היא **נענתה** בדיון — גם אם בניסוח שונה. + +## קריטריונים: +- "addressed" — הדיון מתייחס לנושא הטענה, גם אם במילים אחרות +- "partial" — הדיון נוגע בנושא אך לא עונה ישירות +- "missing" — הדיון לא מתייחס לטענה כלל + +## פלט JSON בלבד: +{"results": [{"claim": 1, "status": "addressed|partial|missing", "where": "הפניה קצרה לאיפה בדיון"}]} +""" + + +async def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict: + """בדיקה סמנטית (Claude) שכל טענה נענתה בדיון.""" yod = next((b for b in blocks if b["block_id"] == "block-yod"), None) if not yod or not yod.get("content"): return {"name": "claims_coverage", "passed": False, @@ -100,51 +123,68 @@ def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict: if not claims: return {"name": "claims_coverage", "passed": True, "errors": [], "severity": "critical"} - # Filter: only claims from original pleadings, not from decision block-zayin + # Filter: only claims from original pleadings source_claims = [c for c in claims if c.get("source_document", "") != "block-zayin"] if not source_claims: source_claims = claims - yod_text = yod["content"].lower() - # Also check block-zayin (our written claims block) for coverage - zayin = next((b for b in blocks if b["block_id"] == "block-zayin"), None) - combined_text = yod_text - if zayin and zayin.get("content"): - combined_text += "\n" + zayin["content"].lower() + # Build claims list + claims_text = "" + for i, c in enumerate(source_claims, 1): + claims_text += f"טענה #{i}: {c['claim_text'][:300]}\n" + + # Truncate discussion if needed + discussion = yod["content"][:12000] + + client = _get_anthropic() + message = client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=4096, + messages=[{ + "role": "user", + "content": f"""{CLAIMS_CHECK_PROMPT} + +## טענות ({len(source_claims)}): +{claims_text} + +## בלוק הדיון: +{discussion}""", + }], + ) + + raw = message.content[0].text.strip() + # Strip markdown code blocks if present + raw = re.sub(r"^```(?:json)?\s*", "", raw) + raw = re.sub(r"\s*```$", "", raw) + try: + json_match = re.search(r"\{.*\}", raw, re.DOTALL) + parsed = json.loads(json_match.group()) if json_match else json.loads(raw) + except (json.JSONDecodeError, AttributeError): + logger.warning("Failed to parse claims check: %s", raw[:300]) + # Fallback: assume all covered (don't block export on parse failure) + return {"name": "claims_coverage", "passed": True, + "errors": ["שגיאה בפענוח תוצאות — לא ניתן לבדוק"], "severity": "warning"} + + results = parsed.get("results", []) + missing = [r for r in results if r.get("status") == "missing"] + partial = [r for r in results if r.get("status") == "partial"] + addressed = [r for r in results if r.get("status") == "addressed"] errors = [] - # Common Hebrew stop words to skip - stop_words = {"את", "של", "על", "עם", "אל", "מן", "לא", "גם", "אם", "או", - "כי", "זה", "זו", "אין", "יש", "הם", "היא", "הוא", "כל", "עוד", - "רק", "אך", "אף", "לפי", "בין", "תוך", "מול", "ידי", "שלא"} + for r in missing: + idx = r.get("claim", 0) + claim_text = source_claims[idx - 1]["claim_text"][:80] if 0 < idx <= len(source_claims) else "?" + errors.append(f"טענה #{idx} לא נענתה: \"{claim_text}...\"") - for claim in source_claims: - claim_text = claim.get("claim_text", "") - # Extract significant words (>3 chars, not stop words) - words = [w.strip(".,;:\"'()-") for w in claim_text.split()] - keywords = [w.lower() for w in words if len(w) > 3 and w.lower() not in stop_words] - - if not keywords: - continue - - # Check how many keywords appear in the discussion - found_count = sum(1 for kw in keywords if kw in combined_text) - coverage = found_count / len(keywords) if keywords else 0 - - # Require at least 25% keyword overlap - if coverage < 0.25: - short = claim_text[:80] - errors.append(f"טענה לא נענתה ({coverage:.0%}): \"{short}...\"") - - total_source = len(source_claims) - covered = total_source - len(errors) + total = len(source_claims) + covered = len(addressed) + len(partial) return { "name": "claims_coverage", - "passed": len(errors) <= total_source * 0.2, # Allow up to 20% uncovered + "passed": len(missing) <= total * 0.2, # Allow up to 20% missing "errors": errors, "severity": "critical", - "details": f"{covered}/{total_source} טענות מכוסות ({covered/total_source*100:.0f}%)" if total_source else "", + "details": f"{covered}/{total} טענות נענו ({covered/total*100:.0f}%), {len(partial)} חלקית, {len(missing)} חסרות", } @@ -295,14 +335,19 @@ async def validate_decision(case_id: UUID) -> dict: appeal_type = case.get("appeal_type", "licensing") # Run all checks + # Run sync checks results = [ check_neutral_background(blocks), - check_claims_coverage(blocks, claims), + ] + # Async check: claims coverage with Claude + results.append(await check_claims_coverage(blocks, claims)) + # More sync checks + results.extend([ check_weight_compliance(blocks, appeal_type), check_structural_integrity(blocks), check_no_duplication(blocks), check_sequential_numbering(blocks), - ] + ]) critical_failures = sum(1 for r in results if not r["passed"] and r["severity"] == "critical") all_passed = all(r["passed"] for r in results)