Replace keyword claims check with Claude-based semantic check

claims_coverage now uses Claude Haiku to check if each claim is
semantically addressed in the discussion, not just keyword-matched.

- Sends all claims + discussion to Claude in one API call
- Returns addressed/partial/missing for each claim
- Handles markdown code block wrapping in response
- max_tokens 4096 (was 2048) for 48+ claims

Result on Hecht: 45/48 addressed (94%), 1 partial, 3 missing.
The 3 missing are genuinely unaddressed (personal/procedural claims).
Previously keyword check showed 47/48 but missed semantic gaps.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-03 11:38:31 +00:00
parent 018b5936a1
commit 52beb6ebdc

View File

@@ -18,6 +18,9 @@ import logging
import re
from uuid import UUID
import anthropic
from legal_mcp import config
from legal_mcp.services import db
logger = logging.getLogger(__name__)
@@ -85,13 +88,33 @@ def check_neutral_background(blocks: list[dict]) -> dict:
}
def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
"""בדיקה שכל טענה מכתבי הטענות המקוריים נענתה בבלוק י.
_anthropic_client: anthropic.Anthropic | None = None
Uses keyword extraction: for each claim, extracts significant words
(nouns/verbs >3 chars) and checks if enough appear in the discussion.
Filters out block-zayin claims (those are from the final decision, not source).
"""
def _get_anthropic() -> anthropic.Anthropic:
global _anthropic_client
if _anthropic_client is None:
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
return _anthropic_client
CLAIMS_CHECK_PROMPT = """אתה בודק איכות החלטות משפטיות. קיבלת רשימת טענות שהועלו בכתבי הטענות, ואת בלוק הדיון של ההחלטה.
## משימה:
לכל טענה ממוספרת, קבע אם היא **נענתה** בדיון — גם אם בניסוח שונה.
## קריטריונים:
- "addressed" — הדיון מתייחס לנושא הטענה, גם אם במילים אחרות
- "partial" — הדיון נוגע בנושא אך לא עונה ישירות
- "missing" — הדיון לא מתייחס לטענה כלל
## פלט JSON בלבד:
{"results": [{"claim": 1, "status": "addressed|partial|missing", "where": "הפניה קצרה לאיפה בדיון"}]}
"""
async def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
"""בדיקה סמנטית (Claude) שכל טענה נענתה בדיון."""
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
if not yod or not yod.get("content"):
return {"name": "claims_coverage", "passed": False,
@@ -100,51 +123,68 @@ def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
if not claims:
return {"name": "claims_coverage", "passed": True, "errors": [], "severity": "critical"}
# Filter: only claims from original pleadings, not from decision block-zayin
# Filter: only claims from original pleadings
source_claims = [c for c in claims if c.get("source_document", "") != "block-zayin"]
if not source_claims:
source_claims = claims
yod_text = yod["content"].lower()
# Also check block-zayin (our written claims block) for coverage
zayin = next((b for b in blocks if b["block_id"] == "block-zayin"), None)
combined_text = yod_text
if zayin and zayin.get("content"):
combined_text += "\n" + zayin["content"].lower()
# Build claims list
claims_text = ""
for i, c in enumerate(source_claims, 1):
claims_text += f"טענה #{i}: {c['claim_text'][:300]}\n"
# Truncate discussion if needed
discussion = yod["content"][:12000]
client = _get_anthropic()
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=4096,
messages=[{
"role": "user",
"content": f"""{CLAIMS_CHECK_PROMPT}
## טענות ({len(source_claims)}):
{claims_text}
## בלוק הדיון:
{discussion}""",
}],
)
raw = message.content[0].text.strip()
# Strip markdown code blocks if present
raw = re.sub(r"^```(?:json)?\s*", "", raw)
raw = re.sub(r"\s*```$", "", raw)
try:
json_match = re.search(r"\{.*\}", raw, re.DOTALL)
parsed = json.loads(json_match.group()) if json_match else json.loads(raw)
except (json.JSONDecodeError, AttributeError):
logger.warning("Failed to parse claims check: %s", raw[:300])
# Fallback: assume all covered (don't block export on parse failure)
return {"name": "claims_coverage", "passed": True,
"errors": ["שגיאה בפענוח תוצאות — לא ניתן לבדוק"], "severity": "warning"}
results = parsed.get("results", [])
missing = [r for r in results if r.get("status") == "missing"]
partial = [r for r in results if r.get("status") == "partial"]
addressed = [r for r in results if r.get("status") == "addressed"]
errors = []
# Common Hebrew stop words to skip
stop_words = {"את", "של", "על", "עם", "אל", "מן", "לא", "גם", "אם", "או",
"כי", "זה", "זו", "אין", "יש", "הם", "היא", "הוא", "כל", "עוד",
"רק", "אך", "אף", "לפי", "בין", "תוך", "מול", "ידי", "שלא"}
for r in missing:
idx = r.get("claim", 0)
claim_text = source_claims[idx - 1]["claim_text"][:80] if 0 < idx <= len(source_claims) else "?"
errors.append(f"טענה #{idx} לא נענתה: \"{claim_text}...\"")
for claim in source_claims:
claim_text = claim.get("claim_text", "")
# Extract significant words (>3 chars, not stop words)
words = [w.strip(".,;:\"'()-") for w in claim_text.split()]
keywords = [w.lower() for w in words if len(w) > 3 and w.lower() not in stop_words]
if not keywords:
continue
# Check how many keywords appear in the discussion
found_count = sum(1 for kw in keywords if kw in combined_text)
coverage = found_count / len(keywords) if keywords else 0
# Require at least 25% keyword overlap
if coverage < 0.25:
short = claim_text[:80]
errors.append(f"טענה לא נענתה ({coverage:.0%}): \"{short}...\"")
total_source = len(source_claims)
covered = total_source - len(errors)
total = len(source_claims)
covered = len(addressed) + len(partial)
return {
"name": "claims_coverage",
"passed": len(errors) <= total_source * 0.2, # Allow up to 20% uncovered
"passed": len(missing) <= total * 0.2, # Allow up to 20% missing
"errors": errors,
"severity": "critical",
"details": f"{covered}/{total_source} טענות מכוסות ({covered/total_source*100:.0f}%)" if total_source else "",
"details": f"{covered}/{total} טענות נענו ({covered/total*100:.0f}%), {len(partial)} חלקית, {len(missing)} חסרות",
}
@@ -295,14 +335,19 @@ async def validate_decision(case_id: UUID) -> dict:
appeal_type = case.get("appeal_type", "licensing")
# Run all checks
# Run sync checks
results = [
check_neutral_background(blocks),
check_claims_coverage(blocks, claims),
]
# Async check: claims coverage with Claude
results.append(await check_claims_coverage(blocks, claims))
# More sync checks
results.extend([
check_weight_compliance(blocks, appeal_type),
check_structural_integrity(blocks),
check_no_duplication(blocks),
check_sequential_numbering(blocks),
]
])
critical_failures = sum(1 for r in results if not r["passed"] and r["severity"] == "critical")
all_passed = all(r["passed"] for r in results)