Add full decision writing pipeline: classify, extract, brainstorm, write, QA, export

New services (11 files):
- classifier.py: auto doc-type classification + party identification (Claude Haiku)
- claims_extractor.py: claim extraction from pleadings (Claude Sonnet + regex)
- references_extractor.py: plan/case-law/legislation detection (regex)
- brainstorm.py: direction generation with 2-3 options (Claude Sonnet)
- block_writer.py: 12-block decision writer (template + Claude Sonnet/Opus)
- docx_exporter.py: DOCX export with David font, RTL, headings
- qa_validator.py: 6 QA checks with export blocking on critical failure
- learning_loop.py: draft vs final comparison + lesson extraction
- metrics.py: KPIs dashboard per case and global
- audit.py: action audit log
- cli.py: standalone CLI with 11 commands

Updated pipeline: extract → classify → chunk → embed → store → extract_references
New MCP tools: 29 total (was 16)
New DB tables: audit_log, decisions CRUD, claims CRUD
Config: Infisical support, external service allowlist

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-03 10:21:47 +00:00
parent df7cc4f5a5
commit d9e5ef0f46
21 changed files with 3957 additions and 14 deletions

View File

@@ -0,0 +1,309 @@
"""בקרת איכות וולידציה של החלטות לפני ייצוא.
6 בדיקות:
1. neutral_background — רקע ניטרלי (ללא מילות שיפוט/ציטוטים מצדדים)
2. claims_coverage — כל טענה מבלוק ז נענתה בבלוק י
3. weight_compliance — משקלות בלוקים בטווח הנכון
4. structural_integrity — כל בלוקי חובה קיימים
5. no_duplication — אין כפילויות בין רקע לדיון
6. sequential_numbering — מספור רציף
אם בדיקה קריטית נכשלת → חוסם ייצוא.
"""
from __future__ import annotations
import json
import logging
import re
from uuid import UUID
from legal_mcp.services import db
logger = logging.getLogger(__name__)
# ── Value/judgment words forbidden in background ──────────────────
VALUE_WORDS = [
"חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך",
"נפשע", "פגום", "חמור", "מקומם", "בלתי סביר", "מופרז",
"מגונה", "פסול", "נלוז", "מטריד", "שגוי", "מוטעה",
]
QUOTE_INDICATORS = [
r"לטענת\s+(העוררי|המשיב|מבקשי)",
r"לדברי\s+(העוררי|המשיב|מבקשי)",
r"העורר\s+טוען",
r"המשיבה\s+טוענת",
r"לשיטת\s+(העוררי|המשיב)",
]
# ── Weight ranges ─────────────────────────────────────────────────
WEIGHT_RANGES = {
"licensing": {
"block-he": (0.5, 5), "block-vav": (3, 40), "block-zayin": (13, 40),
"block-chet": (0, 15), "block-tet": (0, 15),
"block-yod": (30, 75), "block-yod-alef": (1, 10),
},
"betterment": {
"block-he": (0, 5), "block-vav": (2, 20), "block-zayin": (15, 40),
"block-chet": (0, 25), "block-tet": (0, 15),
"block-yod": (25, 75), "block-yod-alef": (1, 10),
},
"compensation": {
"block-he": (0, 5), "block-vav": (2, 20), "block-zayin": (15, 40),
"block-chet": (0, 25), "block-tet": (0, 15),
"block-yod": (25, 75), "block-yod-alef": (1, 10),
},
}
# ── Individual checks ─────────────────────────────────────────────
def check_neutral_background(blocks: list[dict]) -> dict:
"""בדיקת ניטרליות בלוק הרקע (ו)."""
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
if not vav or not vav.get("content"):
return {"name": "neutral_background", "passed": True, "errors": [], "severity": "critical"}
errors = []
lines = vav["content"].split("\n")
for i, line in enumerate(lines):
for word in VALUE_WORDS:
if word in line:
errors.append(f"מילת שיפוט (שורה {i+1}): \"{word}\"")
for pattern in QUOTE_INDICATORS:
if re.search(pattern, line):
errors.append(f"ציטוט מצד (שורה {i+1}): \"{line[:60]}...\"")
return {
"name": "neutral_background",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
"""בדיקה שכל טענה מבלוק ז נענתה בבלוק י."""
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
if not yod or not yod.get("content"):
return {"name": "claims_coverage", "passed": False,
"errors": ["בלוק דיון (י) ריק"], "severity": "critical"}
if not claims:
return {"name": "claims_coverage", "passed": True, "errors": [], "severity": "critical"}
yod_text = yod["content"].lower()
errors = []
for claim in claims:
claim_text = claim.get("claim_text", "")
# Extract key phrases (3+ word sequences) from claim
words = claim_text.split()
key_phrases = []
for j in range(0, len(words) - 2):
phrase = " ".join(words[j:j+3])
if len(phrase) > 8:
key_phrases.append(phrase.lower())
# Check if any key phrase appears in discussion
found = any(phrase in yod_text for phrase in key_phrases[:5])
if not found:
short = claim_text[:80]
errors.append(f"טענה לא נענתה: \"{short}...\"")
return {
"name": "claims_coverage",
"passed": len(errors) == 0,
"errors": errors,
"severity": "critical",
}
def check_weight_compliance(blocks: list[dict], appeal_type: str) -> dict:
"""בדיקת משקלות בלוקים בטווח."""
ranges = WEIGHT_RANGES.get(appeal_type, WEIGHT_RANGES["licensing"])
total_words = sum(b.get("word_count", 0) for b in blocks)
if total_words == 0:
return {"name": "weight_compliance", "passed": False,
"errors": ["אין תוכן בהחלטה"], "severity": "critical"}
errors = []
for block in blocks:
bid = block["block_id"]
wc = block.get("word_count", 0)
if bid in ranges and wc > 0:
weight = wc / total_words * 100
low, high = ranges[bid]
if weight < low:
errors.append(f"{block.get('title', bid)}: {weight:.1f}% (מינימום: {low}%)")
elif weight > high:
errors.append(f"{block.get('title', bid)}: {weight:.1f}% (מקסימום: {high}%)")
return {
"name": "weight_compliance",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
def check_structural_integrity(blocks: list[dict]) -> dict:
"""בדיקת מבנה — כל בלוקי חובה קיימים."""
required = {"block-he", "block-zayin", "block-yod", "block-yod-alef"}
present = {b["block_id"] for b in blocks if b.get("word_count", 0) > 0}
missing = required - present
errors = []
if missing:
block_names = {"block-he": "פתיחה (ה)", "block-zayin": "טענות (ז)",
"block-yod": "דיון (י)", "block-yod-alef": "סיכום (יא)"}
for m in missing:
errors.append(f"בלוק חובה חסר: {block_names.get(m, m)}")
# Check discussion is the heaviest content block
content_blocks = [b for b in blocks if b["block_id"] not in
("block-alef", "block-bet", "block-gimel", "block-dalet", "block-yod-bet")]
if content_blocks:
heaviest = max(content_blocks, key=lambda x: x.get("word_count", 0))
if heaviest["block_id"] != "block-yod":
errors.append(f"בלוק הדיון אינו הגדול ביותר — {heaviest.get('title', '')} גדול יותר")
return {
"name": "structural_integrity",
"passed": len(errors) == 0,
"errors": errors,
"severity": "critical",
}
def check_no_duplication(blocks: list[dict]) -> dict:
"""בדיקת כפילויות בין רקע לדיון."""
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
if not vav or not yod:
return {"name": "no_duplication", "passed": True, "errors": [], "severity": "warning"}
vav_text = vav.get("content", "")
yod_text = yod.get("content", "")
errors = []
# Find sentences from background repeated verbatim in discussion
sentences = [s.strip() for s in re.split(r'[.!?]', vav_text) if len(s.strip()) > 30]
for sent in sentences:
if sent in yod_text:
errors.append(f"כפילות: \"{sent[:60]}...\"")
return {
"name": "no_duplication",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
def check_sequential_numbering(blocks: list[dict]) -> dict:
"""בדיקת מספור רציף בין הבלוקים."""
errors = []
all_numbers = []
for block in blocks:
content = block.get("content", "")
# Find numbered paragraphs (e.g., "1.", "2.", "15.")
numbers = re.findall(r"^(\d+)\.", content, re.MULTILINE)
all_numbers.extend(int(n) for n in numbers)
if all_numbers:
# Check for gaps
sorted_nums = sorted(set(all_numbers))
for i in range(1, len(sorted_nums)):
if sorted_nums[i] - sorted_nums[i-1] > 1:
errors.append(f"פער במספור: {sorted_nums[i-1]}{sorted_nums[i]}")
# Check starts at 1
if sorted_nums and sorted_nums[0] != 1:
errors.append(f"מספור מתחיל מ-{sorted_nums[0]} במקום 1")
return {
"name": "sequential_numbering",
"passed": len(errors) == 0,
"errors": errors,
"severity": "warning",
}
# ── Main validation ───────────────────────────────────────────────
async def validate_decision(case_id: UUID) -> dict:
"""הרצת כל בדיקות QA על החלטה.
Returns:
dict עם passed (bool), results (list), critical_failures (int)
"""
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
if not decision:
raise ValueError(f"No decision for case {case_id}")
# Get blocks
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, block_index, title, content, word_count
FROM decision_blocks WHERE decision_id = $1
ORDER BY block_index""",
UUID(decision["id"]),
)
blocks = [dict(r) for r in rows]
# Get claims
claims = await db.get_claims(case_id)
# Determine appeal type
appeal_type = case.get("appeal_type", "licensing")
# Run all checks
results = [
check_neutral_background(blocks),
check_claims_coverage(blocks, claims),
check_weight_compliance(blocks, appeal_type),
check_structural_integrity(blocks),
check_no_duplication(blocks),
check_sequential_numbering(blocks),
]
critical_failures = sum(1 for r in results if not r["passed"] and r["severity"] == "critical")
all_passed = all(r["passed"] for r in results)
# Store results in qa_results table
async with pool.acquire() as conn:
# Clear previous results
await conn.execute(
"DELETE FROM qa_results WHERE case_id = $1",
case_id,
)
for result in results:
await conn.execute(
"""INSERT INTO qa_results
(decision_id, case_id, check_name, passed, severity, errors, details)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
UUID(decision["id"]), case_id,
result["name"], result["passed"], result["severity"],
json.dumps(result["errors"], ensure_ascii=False),
"",
)
return {
"passed": all_passed,
"critical_failures": critical_failures,
"export_blocked": critical_failures > 0,
"results": results,
"total_checks": len(results),
"passed_checks": sum(1 for r in results if r["passed"]),
}