Files
legal-ai/mcp-server/src/legal_mcp/services/qa_validator.py
Chaim bacb330a2a Replace all Anthropic API calls with Claude Code session (claude -p)
New module claude_session.py provides query() and query_json() that
run prompts via `claude -p` CLI — uses the claude.ai session, zero API cost.

Converted 6 services:
- claims_extractor.py: extract_claims_with_ai
- brainstorm.py: brainstorm_directions
- block_writer.py: write_block (was streaming+thinking, now simple)
- qa_validator.py: claims_coverage check
- style_analyzer.py: 3 API calls (single pass, multi pass, synthesis)
- learning_loop.py: extract_lessons

Only extractor.py still uses Anthropic API (for PDF OCR with Vision).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 14:14:08 +00:00

367 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""בקרת איכות וולידציה של החלטות לפני ייצוא.
6 בדיקות:
1. neutral_background — רקע ניטרלי (ללא מילות שיפוט/ציטוטים מצדדים)
2. claims_coverage — כל טענה מבלוק ז נענתה בבלוק י
3. weight_compliance — משקלות בלוקים בטווח הנכון
4. structural_integrity — כל בלוקי חובה קיימים
5. no_duplication — אין כפילויות בין רקע לדיון
6. sequential_numbering — מספור רציף
אם בדיקה קריטית נכשלת → חוסם ייצוא.
"""
from __future__ import annotations
import json
import logging
import re
from uuid import UUID
from legal_mcp import config
from legal_mcp.config import parse_llm_json
from legal_mcp.services import db, claude_session
logger = logging.getLogger(__name__)
# ── Value/judgment words forbidden in background ──────────────────
VALUE_WORDS = [
"חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך",
"נפשע", "פגום", "חמור", "מקומם", "בלתי סביר", "מופרז",
"מגונה", "פסול", "נלוז", "מטריד", "שגוי", "מוטעה",
]
QUOTE_INDICATORS = [
r"לטענת\s+(העוררי|המשיב|מבקשי)",
r"לדברי\s+(העוררי|המשיב|מבקשי)",
r"העורר\s+טוען",
r"המשיבה\s+טוענת",
r"לשיטת\s+(העוררי|המשיב)",
]
# ── Weight ranges ─────────────────────────────────────────────────
WEIGHT_RANGES = {
"licensing": {
"block-he": (0.5, 5), "block-vav": (3, 40), "block-zayin": (13, 40),
"block-chet": (0, 15), "block-tet": (0, 15),
"block-yod": (30, 75), "block-yod-alef": (1, 10),
},
"betterment": {
"block-he": (0, 5), "block-vav": (2, 20), "block-zayin": (15, 40),
"block-chet": (0, 25), "block-tet": (0, 15),
"block-yod": (25, 75), "block-yod-alef": (1, 10),
},
"compensation": {
"block-he": (0, 5), "block-vav": (2, 20), "block-zayin": (15, 40),
"block-chet": (0, 25), "block-tet": (0, 15),
"block-yod": (25, 75), "block-yod-alef": (1, 10),
},
}
# ── Individual checks ─────────────────────────────────────────────
def check_neutral_background(blocks: list[dict]) -> dict:
"""בדיקת ניטרליות בלוק הרקע (ו)."""
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
if not vav or not vav.get("content"):
return {"name": "neutral_background", "passed": True, "errors": [], "severity": "critical"}
errors = []
lines = vav["content"].split("\n")
for i, line in enumerate(lines):
for word in VALUE_WORDS:
if word in line:
errors.append(f"מילת שיפוט (שורה {i+1}): \"{word}\"")
for pattern in QUOTE_INDICATORS:
if re.search(pattern, line):
errors.append(f"ציטוט מצד (שורה {i+1}): \"{line[:60]}...\"")
return {
"name": "neutral_background",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
CLAIMS_CHECK_PROMPT = """אתה בודק איכות החלטות משפטיות. קיבלת רשימת טענות שהועלו בכתבי הטענות, ואת בלוק הדיון של ההחלטה.
## משימה:
לכל טענה ממוספרת, קבע אם היא **נענתה** בדיון — גם אם בניסוח שונה.
## קריטריונים:
- "addressed" — הדיון מתייחס לנושא הטענה, גם אם במילים אחרות
- "partial" — הדיון נוגע בנושא אך לא עונה ישירות
- "missing" — הדיון לא מתייחס לטענה כלל
## פלט JSON בלבד:
{"results": [{"claim": 1, "status": "addressed|partial|missing", "where": "הפניה קצרה לאיפה בדיון"}]}
"""
async def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
"""בדיקה סמנטית (Claude) שכל טענה נענתה בדיון."""
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
if not yod or not yod.get("content"):
return {"name": "claims_coverage", "passed": False,
"errors": ["בלוק דיון (י) ריק"], "severity": "critical"}
if not claims:
return {"name": "claims_coverage", "passed": True, "errors": [], "severity": "critical"}
# Filter: only APPELLANT claims from original pleadings.
# Committee/permit_applicant claims are defensive positions, not claims
# that need to be "addressed" in the discussion.
source_claims = [
c for c in claims
if c.get("source_document", "") != "block-zayin"
and c.get("party_role") in ("appellant", "respondent")
]
if not source_claims:
# Fallback: all non-block-zayin claims
source_claims = [c for c in claims if c.get("source_document", "") != "block-zayin"]
if not source_claims:
source_claims = claims
# Build claims list
claims_text = ""
for i, c in enumerate(source_claims, 1):
claims_text += f"טענה #{i}: {c['claim_text'][:300]}\n"
# Send full discussion — don't truncate
discussion = yod["content"]
prompt = f"""{CLAIMS_CHECK_PROMPT}
## טענות ({len(source_claims)}):
{claims_text}
## בלוק הדיון:
{discussion}"""
parsed = claude_session.query_json(prompt, timeout=120)
if parsed is None:
logger.warning("Failed to parse claims check: %s", raw[:300])
# Fallback: assume all covered (don't block export on parse failure)
return {"name": "claims_coverage", "passed": True,
"errors": ["שגיאה בפענוח תוצאות — לא ניתן לבדוק"], "severity": "warning"}
results = parsed.get("results", [])
missing = [r for r in results if r.get("status") == "missing"]
partial = [r for r in results if r.get("status") == "partial"]
addressed = [r for r in results if r.get("status") == "addressed"]
errors = []
for r in missing:
idx = r.get("claim", 0)
claim_text = source_claims[idx - 1]["claim_text"][:80] if 0 < idx <= len(source_claims) else "?"
errors.append(f"טענה #{idx} לא נענתה: \"{claim_text}...\"")
total = len(source_claims)
covered = len(addressed) + len(partial)
return {
"name": "claims_coverage",
"passed": len(missing) <= total * 0.2, # Allow up to 20% missing
"errors": errors,
"severity": "critical",
"details": f"{covered}/{total} טענות נענו ({covered/total*100:.0f}%), {len(partial)} חלקית, {len(missing)} חסרות",
}
def check_weight_compliance(blocks: list[dict], appeal_type: str) -> dict:
"""בדיקת משקלות בלוקים בטווח."""
ranges = WEIGHT_RANGES.get(appeal_type, WEIGHT_RANGES["licensing"])
total_words = sum(b.get("word_count", 0) for b in blocks)
if total_words == 0:
return {"name": "weight_compliance", "passed": False,
"errors": ["אין תוכן בהחלטה"], "severity": "critical"}
errors = []
for block in blocks:
bid = block["block_id"]
wc = block.get("word_count", 0)
if bid in ranges and wc > 0:
weight = wc / total_words * 100
low, high = ranges[bid]
if weight < low:
errors.append(f"{block.get('title', bid)}: {weight:.1f}% (מינימום: {low}%)")
elif weight > high:
errors.append(f"{block.get('title', bid)}: {weight:.1f}% (מקסימום: {high}%)")
return {
"name": "weight_compliance",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
def check_structural_integrity(blocks: list[dict]) -> dict:
"""בדיקת מבנה — כל בלוקי חובה קיימים."""
required = {"block-he", "block-zayin", "block-yod", "block-yod-alef"}
present = {b["block_id"] for b in blocks if b.get("word_count", 0) > 0}
missing = required - present
errors = []
if missing:
block_names = {"block-he": "פתיחה (ה)", "block-zayin": "טענות (ז)",
"block-yod": "דיון (י)", "block-yod-alef": "סיכום (יא)"}
for m in missing:
errors.append(f"בלוק חובה חסר: {block_names.get(m, m)}")
# Check discussion is the heaviest content block
content_blocks = [b for b in blocks if b["block_id"] not in
("block-alef", "block-bet", "block-gimel", "block-dalet", "block-yod-bet")]
if content_blocks:
heaviest = max(content_blocks, key=lambda x: x.get("word_count", 0))
if heaviest["block_id"] != "block-yod":
errors.append(f"בלוק הדיון אינו הגדול ביותר — {heaviest.get('title', '')} גדול יותר")
return {
"name": "structural_integrity",
"passed": len(errors) == 0,
"errors": errors,
"severity": "critical",
}
def check_no_duplication(blocks: list[dict]) -> dict:
"""בדיקת כפילויות בין רקע לדיון."""
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
if not vav or not yod:
return {"name": "no_duplication", "passed": True, "errors": [], "severity": "warning"}
vav_text = vav.get("content", "")
yod_text = yod.get("content", "")
errors = []
# Find sentences from background repeated verbatim in discussion
sentences = [s.strip() for s in re.split(r'[.!?]', vav_text) if len(s.strip()) > 30]
for sent in sentences:
if sent in yod_text:
errors.append(f"כפילות: \"{sent[:60]}...\"")
return {
"name": "no_duplication",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
def check_sequential_numbering(blocks: list[dict]) -> dict:
"""בדיקת מספור רציף בין הבלוקים."""
errors = []
all_numbers = []
for block in blocks:
content = block.get("content", "")
# Find numbered paragraphs: "1." or "**1.**" or "**1.**"
numbers = re.findall(r"^(?:\*\*)?(\d+)\.(?:\*\*)?", content, re.MULTILINE)
all_numbers.extend(int(n) for n in numbers)
if all_numbers:
# Check for gaps
sorted_nums = sorted(set(all_numbers))
for i in range(1, len(sorted_nums)):
if sorted_nums[i] - sorted_nums[i-1] > 1:
errors.append(f"פער במספור: {sorted_nums[i-1]}{sorted_nums[i]}")
# Check starts at 1
if sorted_nums and sorted_nums[0] != 1:
errors.append(f"מספור מתחיל מ-{sorted_nums[0]} במקום 1")
return {
"name": "sequential_numbering",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
# ── Main validation ───────────────────────────────────────────────
async def validate_decision(case_id: UUID) -> dict:
"""הרצת כל בדיקות QA על החלטה.
Returns:
dict עם passed (bool), results (list), critical_failures (int)
"""
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
if not decision:
raise ValueError(f"No decision for case {case_id}")
# Get blocks
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, block_index, title, content, word_count
FROM decision_blocks WHERE decision_id = $1
ORDER BY block_index""",
UUID(decision["id"]),
)
blocks = [dict(r) for r in rows]
# Get claims
claims = await db.get_claims(case_id)
# Determine appeal type
appeal_type = case.get("appeal_type", "licensing")
# Run all checks
# Run sync checks
results = [
check_neutral_background(blocks),
]
# Async check: claims coverage with Claude
results.append(await check_claims_coverage(blocks, claims))
# More sync checks
results.extend([
check_weight_compliance(blocks, appeal_type),
check_structural_integrity(blocks),
check_no_duplication(blocks),
check_sequential_numbering(blocks),
])
critical_failures = sum(1 for r in results if not r["passed"] and r["severity"] == "critical")
all_passed = all(r["passed"] for r in results)
# Store results in qa_results table
async with pool.acquire() as conn:
# Clear previous results
await conn.execute(
"DELETE FROM qa_results WHERE case_id = $1",
case_id,
)
for result in results:
await conn.execute(
"""INSERT INTO qa_results
(decision_id, case_id, check_name, passed, severity, errors, details)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
UUID(decision["id"]), case_id,
result["name"], result["passed"], result["severity"],
json.dumps(result["errors"], ensure_ascii=False),
"",
)
return {
"passed": all_passed,
"critical_failures": critical_failures,
"export_blocked": critical_failures > 0,
"results": results,
"total_checks": len(results),
"passed_checks": sum(1 for r in results if r["passed"]),
}