Files
legal-ai/scripts/style_lesson_panel.py
Chaim c8344342a8 fix(style-panel): idempotency + dedup — re-running --apply never duplicates lessons
style_lesson_panel.py: before writing 2/2-keep lessons, skip any whose normalized
lesson_text already exists on the corpus (any source), and collapse duplicates within
a run. Makes the run-learning button safe to click repeatedly (the curator may re-run
the pipeline) — it converges instead of piling up duplicate decision_lessons.

Verified on בל"מ 8126-03-25: re-running --apply with 7 existing lessons wrote 0
("1 כפילויות דולגו"), count stayed 7.

Invariants: INV-LRN1/G10 unchanged (proposals only, manual fold).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 10:57:57 +00:00

355 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Two-judge panel to vet distilled STYLE lessons — "double learning" (DeepSeek + Gemini).
The voice-learning loop (ingest_final_version → learning_loop.analyze_changes) uses
Opus to distill, from a draft→final diff, lessons on HOW דפנה writes — stored as a
PROPOSAL in draft_final_pairs.analysis.changes[]. Per INV-LRN1/G10 those are NOT
auto-committed to the writer-consumed knowledge (SKILL.md / legal-decision-lessons.md).
This panel adds a SECOND independent layer on top of Opus's distillation — two judges
of different lineage vote per lesson on the coarse, reliable question: "is this an
ABSTRACT, generalizable STYLE/METHOD lesson (INV-LRN5 — voice, not legal substance)?"
- deepseek (api.deepseek.com) [DeepSeek — same family as the Hermes curator]
- gemini (gemini-2.5-flash) [Google — #1 on LegalBench]
(No Claude judge here: Opus already produced the proposal; the panel's job is an
independent cross-check, so we use the two NON-Opus lineages = "double learning".)
Agreement policy (mirrors halacha_panel_approve.py, reversible + CSV-backed):
- 2/2 keep → create a decision_lesson row (source='panel:deepseek+gemini')
- 2/2 drop → recorded as panel-rejected, NOT written
- split / incomplete / substance → ESCALATE to chair (printed + in the JSON report)
Distilled lessons tagged domain='substance' are skipped outright (INV-LRN5 — legal
substance must never enter the voice knowledge layer).
Final fold into SKILL.md / legal-decision-lessons.md stays a MANUAL chair gate (G10);
this panel only creates decision_lesson *proposals* attached to the case's style_corpus row.
Local-only (DeepSeek/Gemini keys live on the host, like the halacha panel).
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 # dry-run
.venv/bin/python ../scripts/style_lesson_panel.py --case 8126-03-25 --apply # write proposals
.venv/bin/python ../scripts/style_lesson_panel.py --pair-id <uuid> --apply
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import json
import os
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from uuid import UUID
import httpx
from legal_mcp.services import db
# ── keys (local files, same pattern as halacha_panel_approve.py) ──
def _env_key(name: str, *files: str) -> str:
for f in files:
p = Path(f).expanduser()
if p.exists():
for line in p.read_text().splitlines():
if line.startswith(name + "="):
return line.split("=", 1)[1].strip()
return os.environ.get(name, "")
DEEPSEEK_KEY = _env_key("DEEPSEEK_API_KEY", "~/.hermes/profiles/deepseek/.env", "~/.env")
GEMINI_KEY = _env_key("GOOGLE_GEMINI_API_KEY", "~/.env") or _env_key("GEMINI_API_KEY", "~/.env")
# ── the coarse question (the reliable axis — generalizable style, not substance) ──
KEEP_SYSTEM = (
"אתה עורך-לשון משפטי בכיר המנתח את סגנון-הכתיבה של יו\"ר ועדת ערר. בהינתן לקח-סגנון "
"שחולץ מהשוואת טיוטה לגרסה הסופית, הכרע אם הוא ראוי להישמר כהנחיית-סגנון בת-הכללה "
"לתיקים עתידיים. ראוי (keep=true) = תובנה מופשטת על קול/טון/מבנה/מקצב/ביטויי-מעבר/ניסוח "
"שניתן להחילה על תיקים אחרים. לא-ראוי (keep=false) = מהות משפטית ספציפית (הלכה, עובדה, "
"הכרעה תלוית-תיק), פרט חד-פעמי, או חזרה על תוכן ההחלטה ללא הפשטה סגנונית. "
'החזר JSON בלבד: {"keep": true/false, "reason": "<משפט קצר>"}. ללא markdown.'
)
def _keep_user(change: dict) -> str:
desc = change.get("description") or ""
lesson = change.get("lesson") or ""
block = change.get("block_id") or ""
ctype = change.get("type") or ""
return (
f"סוג השינוי: {ctype}\n"
f"בלוק: {block}\n\n"
f"תיאור השינוי:\n{desc}\n\n"
f"הלקח שהוצע:\n{lesson}"
)
def _lesson_text(change: dict) -> str:
"""The text we would persist as a decision_lesson — prefer the distilled lesson."""
return (change.get("lesson") or change.get("description") or "").strip()
def _category(change: dict) -> str:
"""Map a distilled change to a decision_lessons.category (style/structure/lexicon/...)."""
t = (change.get("type") or "").lower()
block = (change.get("block_id") or "").lower()
if "transition" in t or "ביטוי" in t or "מעבר" in t:
return "lexicon"
if "structure" in t or "מבנה" in t or "order" in t:
return "structure"
if "table" in t or "טבל" in t:
return "tabular"
return "style"
# ── two judges, one signature: (system, user) -> dict|None ──
async def judge_deepseek(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not DEEPSEEK_KEY:
return None
try:
r = await client.post(
"https://api.deepseek.com/v1/chat/completions",
headers={"Authorization": f"Bearer {DEEPSEEK_KEY}", "Content-Type": "application/json"},
json={"model": "deepseek-chat", "temperature": 0, "max_tokens": 160,
"response_format": {"type": "json_object"},
"messages": [{"role": "system", "content": system},
{"role": "user", "content": user}]},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["choices"][0]["message"]["content"])
except Exception:
return None
async def judge_gemini(client: httpx.AsyncClient, system: str, user: str) -> dict | None:
if not GEMINI_KEY:
return None
try:
r = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}",
headers={"Content-Type": "application/json"},
json={"system_instruction": {"parts": [{"text": system}]},
"contents": [{"parts": [{"text": user}]}],
"generationConfig": {"temperature": 0, "maxOutputTokens": 4000,
"responseMimeType": "application/json"}},
timeout=90,
)
r.raise_for_status()
return json.loads(r.json()["candidates"][0]["content"]["parts"][0]["text"])
except Exception:
return None
def _bool(d: dict | None, key: str) -> bool | None:
if not isinstance(d, dict) or key not in d:
return None
v = d[key]
if isinstance(v, bool):
return v
return str(v).strip().lower() in ("true", "1", "yes", "כן")
async def panel_vote(client, system, user, key) -> dict:
"""Run the two judges; return per-judge bools + the verdict."""
ds, gm = await asyncio.gather(
judge_deepseek(client, system, user),
judge_gemini(client, system, user),
)
votes = {"deepseek": _bool(ds, key), "gemini": _bool(gm, key)}
valid = [v for v in votes.values() if v is not None]
agree_yes = len(valid) == 2 and all(valid)
agree_no = len(valid) == 2 and not any(valid)
votes["_verdict"] = ("agree_yes" if agree_yes else
"agree_no" if agree_no else
"split" if len(valid) == 2 else "incomplete")
return votes
# ── inputs: pair (analysis.changes) + the case's style_corpus row ──
def _as_dict(v):
"""analysis/diff_stats come back from asyncpg jsonb as str — normalize to dict."""
if isinstance(v, str):
try:
return json.loads(v)
except Exception:
return {}
return v or {}
async def _resolve_corpus_id(decision_number: str) -> str | None:
"""The case's final must be in style_corpus for lessons to attach (FK)."""
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id FROM style_corpus WHERE decision_number = $1 "
"ORDER BY created_at DESC LIMIT 1",
decision_number,
)
return str(row["id"]) if row else None
def _norm(text: str) -> str:
"""Normalize a lesson for dedup — collapse whitespace, strip."""
return " ".join((text or "").split())
async def _existing_lesson_texts(corpus_id: str) -> set[str]:
"""Normalized lesson_texts already attached to this corpus (any source) —
so re-running --apply is idempotent and never duplicates a lesson."""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT lesson_text FROM decision_lessons WHERE style_corpus_id = $1",
UUID(corpus_id),
)
return {_norm(r["lesson_text"]) for r in rows}
async def _load_pair(args) -> dict | None:
if args.pair_id:
return await db.get_draft_final_pair(UUID(args.pair_id))
# by case → latest analyzed pair
pairs = await db.list_draft_final_pairs(limit=500)
matches = [p for p in pairs if p.get("case_number") == args.case]
if not matches:
return None
# list_draft_final_pairs has no analysis; re-fetch the full row
return await db.get_draft_final_pair(matches[0]["id"])
async def main(args: argparse.Namespace) -> int:
print(f"judges available — deepseek:{bool(DEEPSEEK_KEY)} gemini:{bool(GEMINI_KEY)}\n",
flush=True)
if not (DEEPSEEK_KEY and GEMINI_KEY):
print("⚠ both DeepSeek and Gemini keys are required for the 2/2 panel.", flush=True)
pair = await _load_pair(args)
if not pair:
print(f"no draft_final_pair found for {args.pair_id or args.case}", flush=True)
return 1
if pair.get("status") != "analyzed" or not pair.get("analysis"):
print(f"pair {pair['id']} not analyzed yet (status={pair.get('status')}). "
f"Run ingest_final_version first.", flush=True)
return 1
analysis = _as_dict(pair["analysis"])
changes = analysis.get("changes") or []
case_number = pair.get("case_number") or args.case or ""
if args.limit:
changes = changes[: args.limit]
# INV-LRN5: substance never enters the voice layer.
substance = [c for c in changes if (c.get("domain") or "").lower() == "substance"]
style_changes = [c for c in changes if (c.get("domain") or "").lower() != "substance"]
print(f"pair {pair['id']} ({case_number}): {len(changes)} changes "
f"{len(style_changes)} style / {len(substance)} substance(skipped)\n", flush=True)
sem = asyncio.Semaphore(args.concurrency)
results: list[dict] = []
async with httpx.AsyncClient() as client:
async def run(ch):
async with sem:
v = await panel_vote(client, KEEP_SYSTEM, _keep_user(ch), "keep")
v["_change"] = ch
results.append(v)
tasks = [run(c) for c in style_changes]
for i in range(0, len(tasks), args.concurrency):
await asyncio.gather(*tasks[i : i + args.concurrency])
print(f"{len(results)}/{len(tasks)} judged", flush=True)
cc = Counter(r["_verdict"] for r in results)
print("\n" + "=" * 60)
print(f"STYLE-LESSON PANEL {'(APPLY)' if args.apply else '(DRY-RUN — no DB writes)'}")
print("=" * 60)
print(f" ✓ keep (2/2): {cc['agree_yes']}")
print(f" ✗ drop (2/2): {cc['agree_no']}")
print(f" → CHAIR (split): {cc['split']}")
print(f" ? incomplete: {cc['incomplete']}")
print(f" ⊘ substance skipped: {len(substance)}")
report = [{"verdict": r["_verdict"], "deepseek": r["deepseek"], "gemini": r["gemini"],
"category": _category(r["_change"]), "lesson": _lesson_text(r["_change"])[:200]}
for r in results]
Path("/tmp/style_lesson_panel.json").write_text(
json.dumps(report, ensure_ascii=False, indent=1))
print("\nper-lesson verdicts → /tmp/style_lesson_panel.json")
if not args.apply:
print("\n(dry-run — pass --apply to write keep-lessons as decision_lesson proposals)")
return 0
# ── apply: write 2/2-keep as decision_lesson proposals (reversible) ──
corpus_id = await _resolve_corpus_id(case_number)
if not corpus_id:
print(f"\n✗ no style_corpus row for decision_number={case_number!r}; cannot attach "
f"lessons. Add the final to the style corpus first (document_upload_training).")
return 1
keeps = [r for r in results if r["_verdict"] == "agree_yes" and _lesson_text(r["_change"])]
# Idempotency / dedup — skip keeps already attached to the corpus (any source),
# and collapse duplicates WITHIN this run. Re-running --apply writes nothing new.
existing = await _existing_lesson_texts(corpus_id)
fresh, seen = [], set(existing)
for r in keeps:
n = _norm(_lesson_text(r["_change"]))
if n in seen:
continue
seen.add(n)
fresh.append(r)
skipped_dup = len(keeps) - len(fresh)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
audit = Path(__file__).resolve().parent.parent / "data" / "audit"
audit.mkdir(parents=True, exist_ok=True)
backup = audit / f"style-panel-apply-{case_number}-{ts}.csv"
with backup.open("w", encoding="utf-8", newline="") as f:
w = csv.writer(f)
w.writerow(["corpus_id", "category", "source", "lesson_text"])
for r in fresh:
w.writerow([corpus_id, _category(r["_change"]), "panel:deepseek+gemini",
_lesson_text(r["_change"])])
written = 0
for r in fresh:
await db.add_decision_lesson(
UUID(corpus_id),
lesson_text=_lesson_text(r["_change"]),
category=_category(r["_change"]),
source="panel:deepseek+gemini",
created_by="panel",
)
written += 1
chair = cc["split"] + cc["incomplete"]
print(f"\nAPPLIED (reversible): wrote {written} decision_lesson proposals "
f"(source=panel:deepseek+gemini) · {skipped_dup} כפילויות דולגו · "
f"{chair} escalated to chair · {len(substance)} substance skipped")
print(f"backup → {backup}")
print("NB: fold into SKILL.md / legal-decision-lessons.md stays a manual chair gate (INV-G10).")
return 0
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
g = ap.add_mutually_exclusive_group(required=True)
g.add_argument("--case", help="case_number — uses its latest analyzed draft_final_pair")
g.add_argument("--pair-id", dest="pair_id", help="explicit draft_final_pairs.id")
ap.add_argument("--apply", action="store_true", help="write keep-lessons (default: dry-run)")
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--concurrency", type=int, default=4)
raise SystemExit(asyncio.run(main(ap.parse_args())))