Add outcome-aware drafting, lessons system, and improved style analysis

- Add expected_outcome field to cases (rejection/partial/full/betterment_levy)
- New lessons.py module with golden ratios, templates, and drafting guidance per outcome type
- Style analyzer now uses Opus with full decision text (no truncation), with multi-pass fallback for large corpora
- Drafting tool provides outcome-specific templates, section guidance, and ratio comments
- Improved JSON extraction with bracket-matching fallback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 18:58:42 +00:00
parent 6f515dc2cb
commit 39089dcef5
6 changed files with 726 additions and 71 deletions

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import json
import logging
import re
@@ -12,24 +13,31 @@ from legal_mcp.services import db
logger = logging.getLogger(__name__)
# Token budget for Opus 1M context
MAX_INPUT_TOKENS = 900_000
CHARS_PER_TOKEN = 4 # Hebrew text ratio
ANALYSIS_PROMPT = """\
אתה מנתח סגנון כתיבה משפטית. לפניך החלטות משפטיות שנכתבו על ידי אותה יושבת ראש של ועדת ערר.
אתה מנתח סגנון כתיבה משפטית. לפניך החלטות משפטיות מלאות שנכתבו על ידי אותה יושבת ראש של ועדת ערר.
נתח את ההחלטות וחלץ את דפוסי הכתיבה הבאים:
נתח את ההחלטות לעומק וחלץ את דפוסי הכתיבה הבאים:
1. **נוסחאות פתיחה** (opening_formula) - איך מתחילות ההחלטות
1. **נוסחאות פתיחה** (opening_formula) - איך מתחילות ההחלטות, מה המבנה של הפסקה הראשונה
2. **ביטויי מעבר** (transition) - ביטויים שמחברים בין חלקי ההחלטה
3. **סגנון ציטוט** (citation_style) - איך מצטטים חקיקה ופסיקה
4. **מבנה ניתוח** (analysis_structure) - איך בנוי הניתוח המשפטי
5. **נוסחאות סיום** (closing_formula) - איך מסתיימות ההחלטות
6. **ביטויים אופייניים** (characteristic_phrase) - ביטויים ייחודיים שחוזרים
3. **סגנון ציטוט** (citation_style) - איך מצטטים חקיקה, פסיקה, פרוטוקולים ומסמכים
4. **מבנה ניתוח** (analysis_structure) - איך בנוי הניתוח המשפטי, סדר הדיון בטענות
5. **נוסחאות סיום** (closing_formula) - איך מסתיימות ההחלטות, כולל הוצאות ותאריך
6. **ביטויים אופייניים** (characteristic_phrase) - ביטויים ייחודיים שחוזרים על פני ההחלטות
7. **זרימת טיעון** (argument_flow) - איך נבנה טיעון משפטי לאורך ההחלטה, מהצגת הבעיה דרך ניתוח ועד הכרעה
8. **התייחסות לראיות** (evidence_handling) - איך מתייחסת לראיות, מסמכים, חוות דעת ועדויות
לכל דפוס, תן:
- הטקסט המדויק של הדפוס
- הקשר (באיזה חלק של ההחלטה הוא מופיע)
- דוגמה מתוך הטקסט
חשוב: אתה רואה את ההחלטות המלאות. נצל את זה כדי לזהות דפוסים מכל חלקי ההחלטה - כולל אמצע הניתוח המשפטי, לא רק פתיחה וסיום.
החזר את התוצאות בפורמט הבא (JSON array):
```json
[
@@ -46,6 +54,62 @@ ANALYSIS_PROMPT = """\
{decisions}
"""
SINGLE_DECISION_PROMPT = """\
אתה מנתח סגנון כתיבה משפטית. לפניך החלטה משפטית מלאה שנכתבה על ידי יושבת ראש של ועדת ערר.
חלץ את כל דפוסי הכתיבה מההחלטה הזו, כולל:
1. נוסחאות פתיחה (opening_formula)
2. ביטויי מעבר (transition)
3. סגנון ציטוט (citation_style)
4. מבנה ניתוח (analysis_structure)
5. נוסחאות סיום (closing_formula)
6. ביטויים אופייניים (characteristic_phrase)
7. זרימת טיעון (argument_flow)
8. התייחסות לראיות (evidence_handling)
לכל דפוס, תן: הטקסט המדויק, הקשר, ודוגמה מתוך הטקסט.
החזר JSON array בפורמט:
```json
[
{{
"type": "opening_formula",
"text": "...",
"context": "...",
"example": "..."
}}
]
```
ההחלטה:
{decision}
"""
SYNTHESIS_PROMPT = """\
לפניך דפוסי כתיבה שחולצו מ-{num_decisions} החלטות משפטיות של אותה יושבת ראש ועדת ערר.
משימתך:
1. איחוד דפוסים כפולים או דומים
2. זיהוי דפוסים שחוזרים על פני מספר החלטות (ציין תדירות)
3. הבחנה בין דפוסים אופייניים באמת לבין ניסוחים חד-פעמיים
4. שמירה על המבנה: type, text, context, example
החזר JSON array מאוחד של הדפוסים המשמעותיים ביותר:
```json
[
{{
"type": "opening_formula",
"text": "...",
"context": "...",
"example": "..."
}}
]
```
הדפוסים שחולצו:
{patterns}
"""
async def analyze_corpus() -> dict:
"""Analyze the style corpus and extract/update patterns.
@@ -61,20 +125,34 @@ async def analyze_corpus() -> dict:
if not rows:
return {"error": "אין החלטות בקורפוס. העלה החלטות קודמות תחילה."}
# Prepare text for analysis
# Clear old patterns before re-analysis
await db.clear_style_patterns()
# Calculate token budget
total_chars = sum(len(row["full_text"]) for row in rows)
estimated_tokens = total_chars // CHARS_PER_TOKEN
logger.info(
"Style analysis: %d decisions, %d chars, ~%d tokens",
len(rows), total_chars, estimated_tokens,
)
if estimated_tokens < MAX_INPUT_TOKENS:
return await _analyze_single_pass(rows)
else:
return await _analyze_multi_pass(rows)
async def _analyze_single_pass(rows) -> dict:
"""Send all decisions in a single API call."""
decisions_text = ""
for row in rows:
decisions_text += f"\n\n--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n"
# Limit each decision to ~3000 chars to fit context
text = row["full_text"]
if len(text) > 3000:
text = text[:1500] + "\n...\n" + text[-1500:]
decisions_text += text
decisions_text += row["full_text"]
# Call Claude to analyze patterns
client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
message = client.messages.create(
model="claude-sonnet-4-6",
model="claude-opus-4-6",
max_tokens=16384,
messages=[
{
@@ -84,24 +162,109 @@ async def analyze_corpus() -> dict:
],
)
response_text = message.content[0].text
return await _parse_and_store_patterns(message.content[0].text, len(rows))
# Extract JSON from response - prefer code-block fenced JSON
import json
code_block = re.search(r"```(?:json)?\s*(\[[\s\S]*?\])\s*```", response_text)
async def _analyze_multi_pass(rows) -> dict:
"""Analyze each decision individually, then synthesize patterns."""
client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
all_patterns = []
# Pass 1: Analyze each decision individually
for row in rows:
decision_text = f"--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n"
decision_text += row["full_text"]
message = client.messages.create(
model="claude-opus-4-6",
max_tokens=8192,
messages=[
{
"role": "user",
"content": SINGLE_DECISION_PROMPT.format(decision=decision_text),
}
],
)
patterns = _extract_json(message.content[0].text)
if patterns:
all_patterns.extend(patterns)
if not all_patterns:
return {"error": "לא הצלחתי לחלץ דפוסים מההחלטות"}
# Pass 2: Synthesize across all decisions
message = client.messages.create(
model="claude-opus-4-6",
max_tokens=16384,
messages=[
{
"role": "user",
"content": SYNTHESIS_PROMPT.format(
num_decisions=len(rows),
patterns=json.dumps(all_patterns, ensure_ascii=False, indent=2),
),
}
],
)
return await _parse_and_store_patterns(message.content[0].text, len(rows))
def _extract_json(response_text: str) -> list | None:
"""Extract JSON array from Claude's response text."""
# Strategy 1: Extract content between code fences, then parse
code_block = re.search(r"```(?:json)?\s*([\s\S]*?)```", response_text)
if code_block:
json_str = code_block.group(1)
else:
# Fallback: find the last JSON array (skip prose brackets)
all_arrays = list(re.finditer(r"\[[\s\S]*?\]", response_text))
if not all_arrays:
return {"error": "Could not parse analysis results", "raw": response_text}
json_str = all_arrays[-1].group()
block_content = code_block.group(1).strip()
try:
result = json.loads(block_content)
if isinstance(result, list):
return result
except json.JSONDecodeError:
pass
try:
patterns = json.loads(json_str)
except json.JSONDecodeError as e:
return {"error": f"JSON parse error: {e}", "raw": response_text}
# Strategy 2: Find the outermost JSON array using bracket matching
start = response_text.find("[")
if start == -1:
return None
depth = 0
in_string = False
escape_next = False
for i in range(start, len(response_text)):
c = response_text[i]
if escape_next:
escape_next = False
continue
if c == "\\":
escape_next = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "[":
depth += 1
elif c == "]":
depth -= 1
if depth == 0:
try:
return json.loads(response_text[start:i + 1])
except json.JSONDecodeError as e:
logger.warning("JSON parse error: %s", e)
return None
return None
async def _parse_and_store_patterns(response_text: str, num_decisions: int) -> dict:
"""Parse Claude's response and store patterns in the database."""
patterns = _extract_json(response_text)
if patterns is None:
return {"error": "Could not parse analysis results", "raw": response_text}
# Store patterns
count = 0
@@ -116,6 +279,6 @@ async def analyze_corpus() -> dict:
return {
"patterns_found": count,
"decisions_analyzed": len(rows),
"decisions_analyzed": num_decisions,
"pattern_types": list({p.get("type") for p in patterns}),
}