#!/usr/bin/env python3 """Two-judge panel to vet distilled STYLE lessons — "double learning" (DeepSeek + Gemini). The voice-learning loop (ingest_final_version → learning_loop.analyze_changes) uses Opus to distill, from a draft→final diff, lessons on HOW דפנה writes — stored as a PROPOSAL in draft_final_pairs.analysis.changes[]. Per INV-LRN1/G10 those are NOT auto-committed to the writer-consumed knowledge (SKILL.md / legal-decision-lessons.md). This panel adds a SECOND independent layer on top of Opus's distillation — two judges of different lineage vote per lesson on the coarse, reliable question: "is this an ABSTRACT, generalizable STYLE/METHOD lesson (INV-LRN5 — voice, not legal substance)?" - deepseek (api.deepseek.com) [DeepSeek — same family as the Hermes curator] - gemini (gemini-2.5-flash) [Google — #1 on LegalBench] (No Claude judge here: Opus already produced the proposal; the panel's job is an independent cross-check, so we use the two NON-Opus lineages = "double learning".) Agreement policy (mirrors halacha_panel_approve.py, reversible + CSV-backed): - 2/2 keep → create a decision_lesson row (source='panel:deepseek+gemini') - 2/2 drop → recorded as panel-rejected, NOT written - split / incomplete / substance → ESCALATE to chair (printed + in the JSON report) Distilled lessons tagged domain='substance' are skipped outright (INV-LRN5 — legal substance must never enter the voice knowledge layer). Final fold into SKILL.md / legal-decision-lessons.md stays a MANUAL chair gate (G10); this panel only creates decision_lesson *proposals* attached to the case's style_corpus row. Local-only (DeepSeek/Gemini keys live on the host, like the halacha panel). cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 # dry-run .venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 --apply # write proposals .venv/bin/python ../scripts/style_lesson_panel.py --pair-id --apply """ from __future__ import annotations import argparse import asyncio import csv import json import os from collections import Counter from datetime import datetime, timezone from pathlib import Path from uuid import UUID import httpx from legal_mcp.services import db # ── keys (local files, same pattern as halacha_panel_approve.py) ── def _env_key(name: str, *files: str) -> str: for f in files: p = Path(f).expanduser() if p.exists(): for line in p.read_text().splitlines(): if line.startswith(name + "="): return line.split("=", 1)[1].strip() return os.environ.get(name, "") DEEPSEEK_KEY = _env_key("DEEPSEEK_API_KEY", "~/.hermes/profiles/deepseek/.env", "~/.env") GEMINI_KEY = _env_key("GOOGLE_GEMINI_API_KEY", "~/.env") or _env_key("GEMINI_API_KEY", "~/.env") # ── the coarse question (the reliable axis — generalizable style, not substance) ── KEEP_SYSTEM = ( "אתה עורך-לשון משפטי בכיר המנתח את סגנון-הכתיבה של יו\"ר ועדת ערר. בהינתן לקח-סגנון " "שחולץ מהשוואת טיוטה לגרסה הסופית, הכרע אם הוא ראוי להישמר כהנחיית-סגנון בת-הכללה " "לתיקים עתידיים. ראוי (keep=true) = תובנה מופשטת על קול/טון/מבנה/מקצב/ביטויי-מעבר/ניסוח " "שניתן להחילה על תיקים אחרים. לא-ראוי (keep=false) = מהות משפטית ספציפית (הלכה, עובדה, " "הכרעה תלוית-תיק), פרט חד-פעמי, או חזרה על תוכן ההחלטה ללא הפשטה סגנונית. " 'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.' ) def _keep_user(change: dict) -> str: desc = change.get("description") or "" lesson = change.get("lesson") or "" block = change.get("block_id") or "" ctype = change.get("type") or "" return ( f"סוג השינוי: {ctype}\n" f"בלוק: {block}\n\n" f"תיאור השינוי:\n{desc}\n\n" f"הלקח שהוצע:\n{lesson}" ) def _lesson_text(change: dict) -> str: """The text we would persist as a decision_lesson — prefer the distilled lesson.""" return (change.get("lesson") or change.get("description") or "").strip() def _category(change: dict) -> str: """Map a distilled change to a decision_lessons.category (style/structure/lexicon/...).""" t = (change.get("type") or "").lower() block = (change.get("block_id") or "").lower() if "transition" in t or "ביטוי" in t or "מעבר" in t: return "lexicon" if "structure" in t or "מבנה" in t or "order" in t: return "structure" if "table" in t or "טבל" in t: return "tabular" return "style" # ── two judges, one signature: (system, user) -> dict|None ── async def judge_deepseek(client: httpx.AsyncClient, system: str, user: str) -> dict | None: if not DEEPSEEK_KEY: return None try: r = await client.post( "https://api.deepseek.com/v1/chat/completions", headers={"Authorization": f"Bearer {DEEPSEEK_KEY}", "Content-Type": "application/json"}, json={"model": "deepseek-chat", "temperature": 0, "max_tokens": 160, "response_format": {"type": "json_object"}, "messages": [{"role": "system", "content": system}, {"role": "user", "content": user}]}, timeout=90, ) r.raise_for_status() return json.loads(r.json()["choices"][0]["message"]["content"]) except Exception: return None async def judge_gemini(client: httpx.AsyncClient, system: str, user: str) -> dict | None: if not GEMINI_KEY: return None try: r = await client.post( f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}", headers={"Content-Type": "application/json"}, json={"system_instruction": {"parts": [{"text": system}]}, "contents": [{"parts": [{"text": user}]}], "generationConfig": {"temperature": 0, "maxOutputTokens": 4000, "responseMimeType": "application/json"}}, timeout=90, ) r.raise_for_status() return json.loads(r.json()["candidates"][0]["content"]["parts"][0]["text"]) except Exception: return None def _bool(d: dict | None, key: str) -> bool | None: if not isinstance(d, dict) or key not in d: return None v = d[key] if isinstance(v, bool): return v return str(v).strip().lower() in ("true", "1", "yes", "כן") async def panel_vote(client, system, user, key) -> dict: """Run the two judges; return per-judge bools + the verdict.""" ds, gm = await asyncio.gather( judge_deepseek(client, system, user), judge_gemini(client, system, user), ) votes = {"deepseek": _bool(ds, key), "gemini": _bool(gm, key)} valid = [v for v in votes.values() if v is not None] agree_yes = len(valid) == 2 and all(valid) agree_no = len(valid) == 2 and not any(valid) votes["_verdict"] = ("agree_yes" if agree_yes else "agree_no" if agree_no else "split" if len(valid) == 2 else "incomplete") return votes # ── inputs: pair (analysis.changes) + the case's style_corpus row ── def _as_dict(v): """analysis/diff_stats come back from asyncpg jsonb as str — normalize to dict.""" if isinstance(v, str): try: return json.loads(v) except Exception: return {} return v or {} async def _resolve_corpus_id(decision_number: str) -> str | None: """The case's final must be in style_corpus for lessons to attach (FK).""" pool = await db.get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT id FROM style_corpus WHERE decision_number = $1 " "ORDER BY created_at DESC LIMIT 1", decision_number, ) return str(row["id"]) if row else None async def _load_pair(args) -> dict | None: if args.pair_id: return await db.get_draft_final_pair(UUID(args.pair_id)) # by case → latest analyzed pair pairs = await db.list_draft_final_pairs(limit=500) matches = [p for p in pairs if p.get("case_number") == args.case] if not matches: return None # list_draft_final_pairs has no analysis; re-fetch the full row return await db.get_draft_final_pair(matches[0]["id"]) async def main(args: argparse.Namespace) -> int: print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)}\n", flush=True) if not (DEEPSEEK_KEY and GEMINI_KEY): print("⚠ both DeepSeek and Gemini keys are required for the 2/2 panel.", flush=True) pair = await _load_pair(args) if not pair: print(f"no draft_final_pair found for {args.pair_id or args.case}", flush=True) return 1 if pair.get("status") != "analyzed" or not pair.get("analysis"): print(f"pair {pair['id']} not analyzed yet (status={pair.get('status')}). " f"Run ingest_final_version first.", flush=True) return 1 analysis = _as_dict(pair["analysis"]) changes = analysis.get("changes") or [] case_number = pair.get("case_number") or args.case or "" if args.limit: changes = changes[: args.limit] # INV-LRN5: substance never enters the voice layer. substance = [c for c in changes if (c.get("domain") or "").lower() == "substance"] style_changes = [c for c in changes if (c.get("domain") or "").lower() != "substance"] print(f"pair {pair['id']} ({case_number}): {len(changes)} changes " f"→ {len(style_changes)} style / {len(substance)} substance(skipped)\n", flush=True) sem = asyncio.Semaphore(args.concurrency) results: list[dict] = [] async with httpx.AsyncClient() as client: async def run(ch): async with sem: v = await panel_vote(client, KEEP_SYSTEM, _keep_user(ch), "keep") v["_change"] = ch results.append(v) tasks = [run(c) for c in style_changes] for i in range(0, len(tasks), args.concurrency): await asyncio.gather(*tasks[i : i + args.concurrency]) print(f" …{len(results)}/{len(tasks)} judged", flush=True) cc = Counter(r["_verdict"] for r in results) print("\n" + "=" * 60) print(f"STYLE-LESSON PANEL {'(APPLY)' if args.apply else '(DRY-RUN — no DB writes)'}") print("=" * 60) print(f" ✓ keep (2/2): {cc['agree_yes']}") print(f" ✗ drop (2/2): {cc['agree_no']}") print(f" → CHAIR (split): {cc['split']}") print(f" ? incomplete: {cc['incomplete']}") print(f" ⊘ substance skipped: {len(substance)}") report = [{"verdict": r["_verdict"], "deepseek": r["deepseek"], "gemini": r["gemini"], "category": _category(r["_change"]), "lesson": _lesson_text(r["_change"])[:200]} for r in results] Path("/tmp/style_lesson_panel.json").write_text( json.dumps(report, ensure_ascii=False, indent=1)) print("\nper-lesson verdicts → /tmp/style_lesson_panel.json") if not args.apply: print("\n(dry-run — pass --apply to write keep-lessons as decision_lesson proposals)") return 0 # ── apply: write 2/2-keep as decision_lesson proposals (reversible) ── corpus_id = await _resolve_corpus_id(case_number) if not corpus_id: print(f"\n✗ no style_corpus row for decision_number={case_number!r}; cannot attach " f"lessons. Add the final to the style corpus first (document_upload_training).") return 1 keeps = [r for r in results if r["_verdict"] == "agree_yes" and _lesson_text(r["_change"])] ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") audit = Path(__file__).resolve().parent.parent / "data" / "audit" audit.mkdir(parents=True, exist_ok=True) backup = audit / f"style-panel-apply-{case_number}-{ts}.csv" with backup.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow(["corpus_id", "category", "source", "lesson_text"]) for r in keeps: w.writerow([corpus_id, _category(r["_change"]), "panel:deepseek+gemini", _lesson_text(r["_change"])]) written = 0 for r in keeps: await db.add_decision_lesson( UUID(corpus_id), lesson_text=_lesson_text(r["_change"]), category=_category(r["_change"]), source="panel:deepseek+gemini", created_by="panel", ) written += 1 chair = cc["split"] + cc["incomplete"] print(f"\nAPPLIED (reversible): wrote {written} decision_lesson proposals " f"(source=panel:deepseek+gemini) · {chair} escalated to chair · " f"{len(substance)} substance skipped") print(f"backup → {backup}") print("NB: fold into SKILL.md / legal-decision-lessons.md stays a manual chair gate (INV-G10).") return 0 if __name__ == "__main__": ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) g = ap.add_mutually_exclusive_group(required=True) g.add_argument("--case", help="case_number — uses its latest analyzed draft_final_pair") g.add_argument("--pair-id", dest="pair_id", help="explicit draft_final_pairs.id") ap.add_argument("--apply", action="store_true", help="write keep-lessons (default: dry-run)") ap.add_argument("--limit", type=int, default=0) ap.add_argument("--concurrency", type=int, default=4) raise SystemExit(asyncio.run(main(ap.parse_args())))