"""Style analyzer - extracts writing patterns from Dafna's decision corpus.""" from __future__ import annotations import json import logging import re from legal_mcp import config from legal_mcp.services import db, claude_session logger = logging.getLogger(__name__) # Token budget for Opus 1M context MAX_INPUT_TOKENS = 900_000 CHARS_PER_TOKEN = 4 # Hebrew text ratio ANALYSIS_PROMPT = """\ אתה מנתח סגנון כתיבה משפטית. לפניך החלטות משפטיות מלאות שנכתבו על ידי אותה יושבת ראש של ועדת ערר. נתח את ההחלטות לעומק וחלץ את דפוסי הכתיבה הבאים: 1. **נוסחאות פתיחה** (opening_formula) - איך מתחילות ההחלטות, מה המבנה של הפסקה הראשונה 2. **ביטויי מעבר** (transition) - ביטויים שמחברים בין חלקי ההחלטה 3. **סגנון ציטוט** (citation_style) - איך מצטטים חקיקה, פסיקה, פרוטוקולים ומסמכים 4. **מבנה ניתוח** (analysis_structure) - איך בנוי הניתוח המשפטי, סדר הדיון בטענות 5. **נוסחאות סיום** (closing_formula) - איך מסתיימות ההחלטות, כולל הוצאות ותאריך 6. **ביטויים אופייניים** (characteristic_phrase) - ביטויים ייחודיים שחוזרים על פני ההחלטות 7. **זרימת טיעון** (argument_flow) - איך נבנה טיעון משפטי לאורך ההחלטה, מהצגת הבעיה דרך ניתוח ועד הכרעה 8. **התייחסות לראיות** (evidence_handling) - איך מתייחסת לראיות, מסמכים, חוות דעת ועדויות לכל דפוס, תן: - הטקסט המדויק של הדפוס - הקשר (באיזה חלק של ההחלטה הוא מופיע) - דוגמה מתוך הטקסט חשוב: אתה רואה את ההחלטות המלאות. נצל את זה כדי לזהות דפוסים מכל חלקי ההחלטה - כולל אמצע הניתוח המשפטי, לא רק פתיחה וסיום. החזר את התוצאות בפורמט הבא (JSON array): ```json [ {{ "type": "opening_formula", "text": "לפניי ערר על החלטת...", "context": "פתיחת ההחלטה", "example": "לפניי ערר על החלטת הוועדה המקומית לתכנון ובניה ירושלים" }} ] ``` ההחלטות: {decisions} """ SINGLE_DECISION_PROMPT = """\ אתה מנתח סגנון כתיבה משפטית. לפניך החלטה משפטית מלאה שנכתבה על ידי יושבת ראש של ועדת ערר. חלץ את כל דפוסי הכתיבה מההחלטה הזו, כולל: 1. נוסחאות פתיחה (opening_formula) 2. ביטויי מעבר (transition) 3. סגנון ציטוט (citation_style) 4. מבנה ניתוח (analysis_structure) 5. נוסחאות סיום (closing_formula) 6. ביטויים אופייניים (characteristic_phrase) 7. זרימת טיעון (argument_flow) 8. התייחסות לראיות (evidence_handling) לכל דפוס, תן: הטקסט המדויק, הקשר, ודוגמה מתוך הטקסט. החזר JSON array בפורמט: ```json [ {{ "type": "opening_formula", "text": "...", "context": "...", "example": "..." }} ] ``` ההחלטה: {decision} """ SYNTHESIS_PROMPT = """\ לפניך דפוסי כתיבה שחולצו מ-{num_decisions} החלטות משפטיות של אותה יושבת ראש ועדת ערר. משימתך: 1. איחוד דפוסים כפולים או דומים 2. זיהוי דפוסים שחוזרים על פני מספר החלטות (ציין תדירות) 3. הבחנה בין דפוסים אופייניים באמת לבין ניסוחים חד-פעמיים 4. שמירה על המבנה: type, text, context, example החזר JSON array מאוחד של הדפוסים המשמעותיים ביותר: ```json [ {{ "type": "opening_formula", "text": "...", "context": "...", "example": "..." }} ] ``` הדפוסים שחולצו: {patterns} """ async def analyze_corpus(appeal_subtype: str = "") -> dict: """Analyze the style corpus and extract/update patterns. Args: appeal_subtype: filter by appeal subtype (e.g. 'betterment_levy', 'building_permit'). Empty string = all decisions. Returns summary of patterns found. """ pool = await db.get_pool() async with pool.acquire() as conn: if appeal_subtype: rows = await conn.fetch( "SELECT full_text, decision_number FROM style_corpus " "WHERE appeal_subtype = $1 ORDER BY decision_date DESC LIMIT 20", appeal_subtype, ) else: rows = await conn.fetch( "SELECT full_text, decision_number FROM style_corpus ORDER BY decision_date DESC LIMIT 20" ) if not rows: return {"error": "אין החלטות בקורפוס. העלה החלטות קודמות תחילה."} # Clear old patterns for this subtype (or all if unfiltered) await db.clear_style_patterns(appeal_subtype) # Calculate token budget total_chars = sum(len(row["full_text"]) for row in rows) estimated_tokens = total_chars // CHARS_PER_TOKEN logger.info( "Style analysis: %d decisions, %d chars, ~%d tokens", len(rows), total_chars, estimated_tokens, ) if estimated_tokens < MAX_INPUT_TOKENS: return await _analyze_single_pass(rows, appeal_subtype) else: return await _analyze_multi_pass(rows, appeal_subtype) async def _analyze_single_pass(rows, appeal_subtype: str = "") -> dict: """Send all decisions in a single API call.""" decisions_text = "" for row in rows: decisions_text += f"\n\n--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n" decisions_text += row["full_text"] raw = await claude_session.query( ANALYSIS_PROMPT.format(decisions=decisions_text), timeout=claude_session.LONG_TIMEOUT, ) return await _parse_and_store_patterns(raw, len(rows), appeal_subtype) async def _analyze_multi_pass(rows, appeal_subtype: str = "") -> dict: """Analyze each decision individually, then synthesize patterns.""" all_patterns = [] # Pass 1: Analyze each decision individually for row in rows: decision_text = f"--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n" decision_text += row["full_text"] raw = await claude_session.query( SINGLE_DECISION_PROMPT.format(decision=decision_text), timeout=claude_session.LONG_TIMEOUT, ) patterns = _extract_json(raw) if patterns: all_patterns.extend(patterns) if not all_patterns: return {"error": "לא הצלחתי לחלץ דפוסים מההחלטות"} # Pass 2: Synthesize across all decisions raw = await claude_session.query( SYNTHESIS_PROMPT.format( num_decisions=len(rows), patterns=json.dumps(all_patterns, ensure_ascii=False, indent=2), ), timeout=claude_session.LONG_TIMEOUT, ) return await _parse_and_store_patterns(raw, len(rows), appeal_subtype) def _extract_json(response_text: str) -> list | None: """Extract JSON array from Claude's response text.""" # Strategy 1: Extract content between code fences, then parse code_block = re.search(r"```(?:json)?\s*([\s\S]*?)```", response_text) if code_block: block_content = code_block.group(1).strip() try: result = json.loads(block_content) if isinstance(result, list): return result except json.JSONDecodeError: pass # Strategy 2: Find the outermost JSON array using bracket matching start = response_text.find("[") if start == -1: return None depth = 0 in_string = False escape_next = False for i in range(start, len(response_text)): c = response_text[i] if escape_next: escape_next = False continue if c == "\\": escape_next = True continue if c == '"': in_string = not in_string continue if in_string: continue if c == "[": depth += 1 elif c == "]": depth -= 1 if depth == 0: try: return json.loads(response_text[start:i + 1]) except json.JSONDecodeError as e: logger.warning("JSON parse error: %s", e) return None return None async def _parse_and_store_patterns( response_text: str, num_decisions: int, appeal_subtype: str = "", ) -> dict: """Parse Claude's response and store patterns in the database.""" patterns = _extract_json(response_text) if patterns is None: return {"error": "Could not parse analysis results", "raw": response_text} # Store patterns tagged by appeal_subtype count = 0 for pattern in patterns: await db.upsert_style_pattern( pattern_type=pattern.get("type", "other"), pattern_text=pattern.get("text", ""), context=pattern.get("context", ""), examples=[pattern.get("example", "")], appeal_subtype=appeal_subtype, ) count += 1 return { "patterns_found": count, "decisions_analyzed": num_decisions, "appeal_subtype": appeal_subtype or "all", "pattern_types": list({p.get("type") for p in patterns}), }