style_lesson_panel.py: before writing 2/2-keep lessons, skip any whose normalized
lesson_text already exists on the corpus (any source), and collapse duplicates within
a run. Makes the run-learning button safe to click repeatedly (the curator may re-run
the pipeline) — it converges instead of piling up duplicate decision_lessons.
Verified on בל"מ 8126-03-25: re-running --apply with 7 existing lessons wrote 0
("1 כפילויות דולגו"), count stayed 7.
Invariants: INV-LRN1/G10 unchanged (proposals only, manual fold).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
355 lines
15 KiB
Python
355 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""Two-judge panel to vet distilled STYLE lessons — "double learning" (DeepSeek + Gemini).
|
||
|
||
The voice-learning loop (ingest_final_version → learning_loop.analyze_changes) uses
|
||
Opus to distill, from a draft→final diff, lessons on HOW דפנה writes — stored as a
|
||
PROPOSAL in draft_final_pairs.analysis.changes[]. Per INV-LRN1/G10 those are NOT
|
||
auto-committed to the writer-consumed knowledge (SKILL.md / legal-decision-lessons.md).
|
||
|
||
This panel adds a SECOND independent layer on top of Opus's distillation — two judges
|
||
of different lineage vote per lesson on the coarse, reliable question: "is this an
|
||
ABSTRACT, generalizable STYLE/METHOD lesson (INV-LRN5 — voice, not legal substance)?"
|
||
|
||
- deepseek (api.deepseek.com) [DeepSeek — same family as the Hermes curator]
|
||
- gemini (gemini-2.5-flash) [Google — #1 on LegalBench]
|
||
|
||
(No Claude judge here: Opus already produced the proposal; the panel's job is an
|
||
independent cross-check, so we use the two NON-Opus lineages = "double learning".)
|
||
|
||
Agreement policy (mirrors halacha_panel_approve.py, reversible + CSV-backed):
|
||
- 2/2 keep → create a decision_lesson row (source='panel:deepseek+gemini')
|
||
- 2/2 drop → recorded as panel-rejected, NOT written
|
||
- split / incomplete / substance → ESCALATE to chair (printed + in the JSON report)
|
||
|
||
Distilled lessons tagged domain='substance' are skipped outright (INV-LRN5 — legal
|
||
substance must never enter the voice knowledge layer).
|
||
|
||
Final fold into SKILL.md / legal-decision-lessons.md stays a MANUAL chair gate (G10);
|
||
this panel only creates decision_lesson *proposals* attached to the case's style_corpus row.
|
||
|
||
Local-only (DeepSeek/Gemini keys live on the host, like the halacha panel).
|
||
|
||
cd ~/legal-ai/mcp-server
|
||
.venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 # dry-run
|
||
.venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 --apply # write proposals
|
||
.venv/bin/python ../scripts/style_lesson_panel.py --pair-id <uuid> --apply
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import csv
|
||
import json
|
||
import os
|
||
from collections import Counter
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from uuid import UUID
|
||
|
||
import httpx
|
||
|
||
from legal_mcp.services import db
|
||
|
||
|
||
# ── keys (local files, same pattern as halacha_panel_approve.py) ──
|
||
|
||
def _env_key(name: str, *files: str) -> str:
|
||
for f in files:
|
||
p = Path(f).expanduser()
|
||
if p.exists():
|
||
for line in p.read_text().splitlines():
|
||
if line.startswith(name + "="):
|
||
return line.split("=", 1)[1].strip()
|
||
return os.environ.get(name, "")
|
||
|
||
|
||
DEEPSEEK_KEY = _env_key("DEEPSEEK_API_KEY", "~/.hermes/profiles/deepseek/.env", "~/.env")
|
||
GEMINI_KEY = _env_key("GOOGLE_GEMINI_API_KEY", "~/.env") or _env_key("GEMINI_API_KEY", "~/.env")
|
||
|
||
|
||
# ── the coarse question (the reliable axis — generalizable style, not substance) ──
|
||
|
||
KEEP_SYSTEM = (
|
||
"אתה עורך-לשון משפטי בכיר המנתח את סגנון-הכתיבה של יו\"ר ועדת ערר. בהינתן לקח-סגנון "
|
||
"שחולץ מהשוואת טיוטה לגרסה הסופית, הכרע אם הוא ראוי להישמר כהנחיית-סגנון בת-הכללה "
|
||
"לתיקים עתידיים. ראוי (keep=true) = תובנה מופשטת על קול/טון/מבנה/מקצב/ביטויי-מעבר/ניסוח "
|
||
"שניתן להחילה על תיקים אחרים. לא-ראוי (keep=false) = מהות משפטית ספציפית (הלכה, עובדה, "
|
||
"הכרעה תלוית-תיק), פרט חד-פעמי, או חזרה על תוכן ההחלטה ללא הפשטה סגנונית. "
|
||
'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.'
|
||
)
|
||
|
||
|
||
def _keep_user(change: dict) -> str:
|
||
desc = change.get("description") or ""
|
||
lesson = change.get("lesson") or ""
|
||
block = change.get("block_id") or ""
|
||
ctype = change.get("type") or ""
|
||
return (
|
||
f"סוג השינוי: {ctype}\n"
|
||
f"בלוק: {block}\n\n"
|
||
f"תיאור השינוי:\n{desc}\n\n"
|
||
f"הלקח שהוצע:\n{lesson}"
|
||
)
|
||
|
||
|
||
def _lesson_text(change: dict) -> str:
|
||
"""The text we would persist as a decision_lesson — prefer the distilled lesson."""
|
||
return (change.get("lesson") or change.get("description") or "").strip()
|
||
|
||
|
||
def _category(change: dict) -> str:
|
||
"""Map a distilled change to a decision_lessons.category (style/structure/lexicon/...)."""
|
||
t = (change.get("type") or "").lower()
|
||
block = (change.get("block_id") or "").lower()
|
||
if "transition" in t or "ביטוי" in t or "מעבר" in t:
|
||
return "lexicon"
|
||
if "structure" in t or "מבנה" in t or "order" in t:
|
||
return "structure"
|
||
if "table" in t or "טבל" in t:
|
||
return "tabular"
|
||
return "style"
|
||
|
||
|
||
# ── two judges, one signature: (system, user) -> dict|None ──
|
||
|
||
async def judge_deepseek(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
|
||
if not DEEPSEEK_KEY:
|
||
return None
|
||
try:
|
||
r = await client.post(
|
||
"https://api.deepseek.com/v1/chat/completions",
|
||
headers={"Authorization": f"Bearer {DEEPSEEK_KEY}", "Content-Type": "application/json"},
|
||
json={"model": "deepseek-chat", "temperature": 0, "max_tokens": 160,
|
||
"response_format": {"type": "json_object"},
|
||
"messages": [{"role": "system", "content": system},
|
||
{"role": "user", "content": user}]},
|
||
timeout=90,
|
||
)
|
||
r.raise_for_status()
|
||
return json.loads(r.json()["choices"][0]["message"]["content"])
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def judge_gemini(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
|
||
if not GEMINI_KEY:
|
||
return None
|
||
try:
|
||
r = await client.post(
|
||
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}",
|
||
headers={"Content-Type": "application/json"},
|
||
json={"system_instruction": {"parts": [{"text": system}]},
|
||
"contents": [{"parts": [{"text": user}]}],
|
||
"generationConfig": {"temperature": 0, "maxOutputTokens": 4000,
|
||
"responseMimeType": "application/json"}},
|
||
timeout=90,
|
||
)
|
||
r.raise_for_status()
|
||
return json.loads(r.json()["candidates"][0]["content"]["parts"][0]["text"])
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _bool(d: dict | None, key: str) -> bool | None:
|
||
if not isinstance(d, dict) or key not in d:
|
||
return None
|
||
v = d[key]
|
||
if isinstance(v, bool):
|
||
return v
|
||
return str(v).strip().lower() in ("true", "1", "yes", "כן")
|
||
|
||
|
||
async def panel_vote(client, system, user, key) -> dict:
|
||
"""Run the two judges; return per-judge bools + the verdict."""
|
||
ds, gm = await asyncio.gather(
|
||
judge_deepseek(client, system, user),
|
||
judge_gemini(client, system, user),
|
||
)
|
||
votes = {"deepseek": _bool(ds, key), "gemini": _bool(gm, key)}
|
||
valid = [v for v in votes.values() if v is not None]
|
||
agree_yes = len(valid) == 2 and all(valid)
|
||
agree_no = len(valid) == 2 and not any(valid)
|
||
votes["_verdict"] = ("agree_yes" if agree_yes else
|
||
"agree_no" if agree_no else
|
||
"split" if len(valid) == 2 else "incomplete")
|
||
return votes
|
||
|
||
|
||
# ── inputs: pair (analysis.changes) + the case's style_corpus row ──
|
||
|
||
def _as_dict(v):
|
||
"""analysis/diff_stats come back from asyncpg jsonb as str — normalize to dict."""
|
||
if isinstance(v, str):
|
||
try:
|
||
return json.loads(v)
|
||
except Exception:
|
||
return {}
|
||
return v or {}
|
||
|
||
|
||
async def _resolve_corpus_id(decision_number: str) -> str | None:
|
||
"""The case's final must be in style_corpus for lessons to attach (FK)."""
|
||
pool = await db.get_pool()
|
||
async with pool.acquire() as conn:
|
||
row = await conn.fetchrow(
|
||
"SELECT id FROM style_corpus WHERE decision_number = $1 "
|
||
"ORDER BY created_at DESC LIMIT 1",
|
||
decision_number,
|
||
)
|
||
return str(row["id"]) if row else None
|
||
|
||
|
||
def _norm(text: str) -> str:
|
||
"""Normalize a lesson for dedup — collapse whitespace, strip."""
|
||
return " ".join((text or "").split())
|
||
|
||
|
||
async def _existing_lesson_texts(corpus_id: str) -> set[str]:
|
||
"""Normalized lesson_texts already attached to this corpus (any source) —
|
||
so re-running --apply is idempotent and never duplicates a lesson."""
|
||
pool = await db.get_pool()
|
||
async with pool.acquire() as conn:
|
||
rows = await conn.fetch(
|
||
"SELECT lesson_text FROM decision_lessons WHERE style_corpus_id = $1",
|
||
UUID(corpus_id),
|
||
)
|
||
return {_norm(r["lesson_text"]) for r in rows}
|
||
|
||
|
||
async def _load_pair(args) -> dict | None:
|
||
if args.pair_id:
|
||
return await db.get_draft_final_pair(UUID(args.pair_id))
|
||
# by case → latest analyzed pair
|
||
pairs = await db.list_draft_final_pairs(limit=500)
|
||
matches = [p for p in pairs if p.get("case_number") == args.case]
|
||
if not matches:
|
||
return None
|
||
# list_draft_final_pairs has no analysis; re-fetch the full row
|
||
return await db.get_draft_final_pair(matches[0]["id"])
|
||
|
||
|
||
async def main(args: argparse.Namespace) -> int:
|
||
print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)}\n",
|
||
flush=True)
|
||
if not (DEEPSEEK_KEY and GEMINI_KEY):
|
||
print("⚠ both DeepSeek and Gemini keys are required for the 2/2 panel.", flush=True)
|
||
|
||
pair = await _load_pair(args)
|
||
if not pair:
|
||
print(f"no draft_final_pair found for {args.pair_id or args.case}", flush=True)
|
||
return 1
|
||
if pair.get("status") != "analyzed" or not pair.get("analysis"):
|
||
print(f"pair {pair['id']} not analyzed yet (status={pair.get('status')}). "
|
||
f"Run ingest_final_version first.", flush=True)
|
||
return 1
|
||
|
||
analysis = _as_dict(pair["analysis"])
|
||
changes = analysis.get("changes") or []
|
||
case_number = pair.get("case_number") or args.case or ""
|
||
if args.limit:
|
||
changes = changes[: args.limit]
|
||
|
||
# INV-LRN5: substance never enters the voice layer.
|
||
substance = [c for c in changes if (c.get("domain") or "").lower() == "substance"]
|
||
style_changes = [c for c in changes if (c.get("domain") or "").lower() != "substance"]
|
||
print(f"pair {pair['id']} ({case_number}): {len(changes)} changes "
|
||
f"→ {len(style_changes)} style / {len(substance)} substance(skipped)\n", flush=True)
|
||
|
||
sem = asyncio.Semaphore(args.concurrency)
|
||
results: list[dict] = []
|
||
async with httpx.AsyncClient() as client:
|
||
async def run(ch):
|
||
async with sem:
|
||
v = await panel_vote(client, KEEP_SYSTEM, _keep_user(ch), "keep")
|
||
v["_change"] = ch
|
||
results.append(v)
|
||
|
||
tasks = [run(c) for c in style_changes]
|
||
for i in range(0, len(tasks), args.concurrency):
|
||
await asyncio.gather(*tasks[i : i + args.concurrency])
|
||
print(f" …{len(results)}/{len(tasks)} judged", flush=True)
|
||
|
||
cc = Counter(r["_verdict"] for r in results)
|
||
print("\n" + "=" * 60)
|
||
print(f"STYLE-LESSON PANEL {'(APPLY)' if args.apply else '(DRY-RUN — no DB writes)'}")
|
||
print("=" * 60)
|
||
print(f" ✓ keep (2/2): {cc['agree_yes']}")
|
||
print(f" ✗ drop (2/2): {cc['agree_no']}")
|
||
print(f" → CHAIR (split): {cc['split']}")
|
||
print(f" ? incomplete: {cc['incomplete']}")
|
||
print(f" ⊘ substance skipped: {len(substance)}")
|
||
|
||
report = [{"verdict": r["_verdict"], "deepseek": r["deepseek"], "gemini": r["gemini"],
|
||
"category": _category(r["_change"]), "lesson": _lesson_text(r["_change"])[:200]}
|
||
for r in results]
|
||
Path("/tmp/style_lesson_panel.json").write_text(
|
||
json.dumps(report, ensure_ascii=False, indent=1))
|
||
print("\nper-lesson verdicts → /tmp/style_lesson_panel.json")
|
||
|
||
if not args.apply:
|
||
print("\n(dry-run — pass --apply to write keep-lessons as decision_lesson proposals)")
|
||
return 0
|
||
|
||
# ── apply: write 2/2-keep as decision_lesson proposals (reversible) ──
|
||
corpus_id = await _resolve_corpus_id(case_number)
|
||
if not corpus_id:
|
||
print(f"\n✗ no style_corpus row for decision_number={case_number!r}; cannot attach "
|
||
f"lessons. Add the final to the style corpus first (document_upload_training).")
|
||
return 1
|
||
|
||
keeps = [r for r in results if r["_verdict"] == "agree_yes" and _lesson_text(r["_change"])]
|
||
|
||
# Idempotency / dedup — skip keeps already attached to the corpus (any source),
|
||
# and collapse duplicates WITHIN this run. Re-running --apply writes nothing new.
|
||
existing = await _existing_lesson_texts(corpus_id)
|
||
fresh, seen = [], set(existing)
|
||
for r in keeps:
|
||
n = _norm(_lesson_text(r["_change"]))
|
||
if n in seen:
|
||
continue
|
||
seen.add(n)
|
||
fresh.append(r)
|
||
skipped_dup = len(keeps) - len(fresh)
|
||
|
||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||
audit = Path(__file__).resolve().parent.parent / "data" / "audit"
|
||
audit.mkdir(parents=True, exist_ok=True)
|
||
backup = audit / f"style-panel-apply-{case_number}-{ts}.csv"
|
||
with backup.open("w", encoding="utf-8", newline="") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(["corpus_id", "category", "source", "lesson_text"])
|
||
for r in fresh:
|
||
w.writerow([corpus_id, _category(r["_change"]), "panel:deepseek+gemini",
|
||
_lesson_text(r["_change"])])
|
||
|
||
written = 0
|
||
for r in fresh:
|
||
await db.add_decision_lesson(
|
||
UUID(corpus_id),
|
||
lesson_text=_lesson_text(r["_change"]),
|
||
category=_category(r["_change"]),
|
||
source="panel:deepseek+gemini",
|
||
created_by="panel",
|
||
)
|
||
written += 1
|
||
|
||
chair = cc["split"] + cc["incomplete"]
|
||
print(f"\nAPPLIED (reversible): wrote {written} decision_lesson proposals "
|
||
f"(source=panel:deepseek+gemini) · {skipped_dup} כפילויות דולגו · "
|
||
f"{chair} escalated to chair · {len(substance)} substance skipped")
|
||
print(f"backup → {backup}")
|
||
print("NB: fold into SKILL.md / legal-decision-lessons.md stays a manual chair gate (INV-G10).")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
g = ap.add_mutually_exclusive_group(required=True)
|
||
g.add_argument("--case", help="case_number — uses its latest analyzed draft_final_pair")
|
||
g.add_argument("--pair-id", dest="pair_id", help="explicit draft_final_pairs.id")
|
||
ap.add_argument("--apply", action="store_true", help="write keep-lessons (default: dry-run)")
|
||
ap.add_argument("--limit", type=int, default=0)
|
||
ap.add_argument("--concurrency", type=int, default=4)
|
||
raise SystemExit(asyncio.run(main(ap.parse_args())))
|