"""מדד מרחק-סגנון (T7) — האם הטיוטות מתכנסות לדפנה לאורך זמן. שלושה רכיבים, כולם ללא LLM (דטרמיניסטי, זול): 1. golden_ratio_adherence — סטיית אחוזי-הסעיפים מ-GOLDEN_RATIOS לפי תוצאה. 2. anti_pattern_hits — ספירת אנטי-דפוסים (מ-lessons.ANTI_PATTERNS) בטקסט הטיוטה. 3. draft_to_final_diff — change_percent מ-draft_final_pairs (ככל שיורד → מתכנס). זהו מטא-אות על בריאות-הלמידה (INV-LRN4) — נצרך ע"י לוח-מחוונים / QA, לא ע"י הכותב. """ from __future__ import annotations import logging import re from uuid import UUID from legal_mcp.services import db from legal_mcp.services.lessons import ANTI_PATTERNS, GOLDEN_RATIOS, canonical_outcome logger = logging.getLogger(__name__) # block_id → golden-ratio section _BLOCK_TO_SECTION = { "block-vav": "background", "block-zayin": "claims", "block-yod": "discussion", "block-yod-alef": "summary", } # chunker section_type → golden-ratio section (for corpus measurement, T10) _CHUNK_SECTION_TO_GOLDEN = { "facts": "background", "intro": "background", "appellant_claims": "claims", "respondent_claims": "claims", "legal_analysis": "discussion", "conclusion": "summary", "ruling": "summary", } _CORPUS_RATIOS_CACHE: dict | None = None async def measure_corpus_ratios() -> dict: """Measure ACTUAL section %-of-total from Dafna's style_corpus, averaged per outcome — the empirical counterpart to lessons.GOLDEN_RATIOS (T10). Splits each decision via chunker (accurate, not the filtered exemplars). Cached for the process. Returns {outcome: {"n": int, "sections": {sec: pct}}}.""" global _CORPUS_RATIOS_CACHE if _CORPUS_RATIOS_CACHE is not None: return _CORPUS_RATIOS_CACHE from legal_mcp.services.chunker import _split_into_sections pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch("SELECT full_text, outcome FROM style_corpus WHERE full_text <> ''") # Per-outcome AND an "_all" aggregate. style_corpus.outcome is currently # unpopulated for the imported corpus, so per-outcome may be empty — "_all" # is the meaningful signal today, and per-outcome becomes live once outcomes # are backfilled. No silent loss: callers see which buckets have data via n. by_outcome: dict[str, list[dict]] = {} for r in rows: sect_words: dict[str, int] = {} for stype, stext in _split_into_sections(r["full_text"]): g = _CHUNK_SECTION_TO_GOLDEN.get(stype) if g: sect_words[g] = sect_words.get(g, 0) + len(stext.split()) total = sum(sect_words.values()) if total < 100: # sections didn't parse — skip continue pct = {s: w / total * 100 for s, w in sect_words.items()} by_outcome.setdefault("_all", []).append(pct) outcome = canonical_outcome(r["outcome"] or "") if outcome: by_outcome.setdefault(outcome, []).append(pct) result: dict = {} for outcome, decs in by_outcome.items(): avg = {} for sec in ("background", "claims", "discussion", "summary"): vals = [d.get(sec, 0.0) for d in decs] if vals: avg[sec] = round(sum(vals) / len(vals), 1) result[outcome] = {"n": len(decs), "sections": avg} _CORPUS_RATIOS_CACHE = result return result def count_anti_patterns(text: str) -> dict: """Count each anti-pattern occurrence in text. Lower = closer to Dafna.""" hits = {} total = 0 for ap in ANTI_PATTERNS: n = len(re.findall(ap["regex"], text or "")) if n: hits[ap["name"]] = {"count": n, "note": ap["note"]} total += n return {"total": total, "by_pattern": hits} def golden_ratio_adherence(block_word_counts: dict[str, int], outcome: str) -> dict: """% of total per section vs GOLDEN_RATIOS target range. deviation=0 ⇒ within range.""" outcome = canonical_outcome(outcome) targets = GOLDEN_RATIOS.get(outcome) total = sum(block_word_counts.values()) if not targets or total == 0: return {"outcome": outcome, "total_words": total, "sections": {}, "max_deviation": None} sections = {} max_dev = 0.0 for block_id, section in _BLOCK_TO_SECTION.items(): if section not in targets: continue pct = round(block_word_counts.get(block_id, 0) / total * 100, 1) lo, hi = targets[section] if pct < lo: dev = round(lo - pct, 1) elif pct > hi: dev = round(pct - hi, 1) else: dev = 0.0 max_dev = max(max_dev, dev) sections[section] = {"actual_pct": pct, "target": [lo, hi], "deviation_pp": dev} return {"outcome": outcome, "total_words": total, "sections": sections, "max_deviation": max_dev} async def style_distance(case_number: str) -> dict: """Assemble the 3 style-distance components for one case (T7).""" case = await db.get_case_by_number(case_number) if not case: return {"error": f"case {case_number} not found"} case_id = UUID(case["id"]) decision = await db.get_decision_by_case(case_id) outcome = (decision or {}).get("outcome", "rejection") pool = await db.get_pool() async with pool.acquire() as conn: block_rows = [] draft_text = "" if decision: block_rows = await conn.fetch( "SELECT block_id, content, word_count FROM decision_blocks " "WHERE decision_id = $1 ORDER BY block_index", UUID(decision["id"]), ) draft_text = "\n\n".join(b["content"] for b in block_rows if b["content"]) pair = await conn.fetchrow( "SELECT draft_text, diff_stats, status FROM draft_final_pairs " "WHERE case_id = $1 ORDER BY created_at DESC LIMIT 1", case_id, ) # Prefer the immutable snapshot's draft text when present. if pair and pair["draft_text"]: draft_text = pair["draft_text"] word_counts = {b["block_id"]: (b["word_count"] or 0) for b in block_rows} ratios = golden_ratio_adherence(word_counts, outcome) anti = count_anti_patterns(draft_text) diff = None if pair and pair["diff_stats"]: raw = pair["diff_stats"] if isinstance(raw, str): import json try: raw = json.loads(raw) except (json.JSONDecodeError, TypeError): raw = None diff = raw return { "case_number": case_number, "outcome": canonical_outcome(outcome), "golden_ratio_adherence": ratios, "anti_pattern_hits": anti, "draft_to_final_diff": diff, "pair_status": pair["status"] if pair else None, "summary": { "ratio_max_deviation_pp": ratios.get("max_deviation"), "anti_pattern_total": anti["total"], "change_percent": (diff or {}).get("change_percent") if diff else None, }, }