Files
legal-ai/scripts/final_learning_pipeline.py
Chaim 959cb093b4
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
feat(learning): אינדיקציית-תיק למצב למידת-קול + חילוץ-הלכות אחרי החלטה סופית
אחרי העלאת החלטה סופית והרצת שני הפייפליינים האוטומטיים (למידת-קול,
חילוץ/אימות-הלכות), התיק לא הציג אם כל תהליך בוצע/הצליח/למה-נכשל. במיוחד
תקלת chair_name ריק (2026-06-12) שמפילה בשקט את העתק-ה-case_law → חילוץ-הלכות
לא מתחיל בכלל, בלי שזה גלוי. כעת מוצגות שתי אינדיקציות ליד כפתורי-ההרצה.

Backend (גזירה ממקור-יחיד, ללא מסלול-מעקב מקביל):
- SCHEMA_V36: draft_final_pairs.learning_run (JSONB) — שדה-תיעוד על פנקס-ההתאמה
  (INV-LRN4), חותם את תוצאת-הריצה של פייפליין-הלמידה (succeeded/failed+סיבה+at).
- set_learning_run_outcome() — חיתום הצלחה/כישלון על ה-pair האחרון.
- case_learning_status() — גזירה read-only מ-draft_final_pairs/style_corpus/
  decision_lessons/case_law/halachot: בוצע? הצליח? למה-לא? כמה הלכות חולצו.
- final_learning_pipeline.py — חותם outcome בהצלחה וב-except (surfaced, לא בלוע).
- חשיפה: case_get מוסיף learning_status (→MCP + /api/cases/{case}/details) +
  endpoint ייעודי GET /api/cases/{case}/learning-status (אותה פונקציה — בלי כפילות).

UI (אושר דרך שער-העיצוב Claude Design — כרטיס 21-final-learning-status):
- useCaseLearningStatus (api/learning.ts) — hook + polling עדין בזמן in-flight.
- LearningStatusBadges — 2 שורות (למידת-קול / חילוץ-הלכות) עם badge + תת-שורה
  (מס' לקחים · רישום-קורפוס / מס' הלכות + פירוק אושרו/ממתינות/נדחו / סיבת-כישלון).
- שילוב ב-drafts-panel תחת "החלטה סופית של היו״ר" + אינוולידציה בכפתורי-ההרצה.

אומת מול ה-DB החי: הצליח+5 הלכות (8174-12-24) · נכנס-אך-pending (1200-12-25) ·
לא-נכנס-לקורפוס (8125-09-24) · round-trip חיתום-כישלון. tsc/eslint נקיים.

Invariants: G1 (נרמול-במקור — גזירה, לא טלאי), G2 (אין מסלול מקביל — שדה על
הפנקס הקיים + exposer יחיד), INV-LRN4 (פנקס-ההתאמה), INV-IA1 (מקור-אמת יחיד).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 10:50:12 +00:00

198 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""One-shot LOCAL pipeline for the 'run-learning' button (voice learning).
The container can't run the LLM steps (claude/DeepSeek/Gemini keys are local), so
the /api/cases/{case}/final/run-learning endpoint wakes the Hermes curator, which
runs THIS single deterministic command. Collapsing the flow into one script (rather
than asking the agent to assemble several tool calls) makes the autonomous path
reliable.
Steps:
[1] ingest_final_version(case, file_path) → Opus distils draft↔final into
draft_final_pairs.analysis (status→analyzed). INV-LRN5 separates style↔substance.
[2] enroll the final into style_corpus (idempotent) so lessons have a corpus_id.
[3] style_lesson_panel --apply → DeepSeek+Gemini vote per style lesson; 2/2-keep →
decision_lesson (source=panel:deepseek+gemini); split → chair (INV-G10).
The fold into SKILL.md / legal-decision-lessons.md stays a manual chair gate.
Local-only. Idempotent — safe to re-run.
Durable (X16 / INV-DUR1): the 3 steps run through scripts/_pipeline_runtime.py
(shared with final_halacha) with a SQLite checkpoint per case
(data/checkpoints/learning.sqlite). A crash/OOM in the long style panel [3]
RESUMES from [3] instead of re-paying the Opus distillation [1]. Default =
auto-resume; ``--fresh`` forces a clean run. Needs the host extra
``pip install -e ".[durable]"``; without it the steps run linearly (as before).
cd ~/legal-ai/mcp-server
.venv/bin/python ../scripts/final_learning_pipeline.py --case 8126-03-25
"""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
from argparse import Namespace
from pathlib import Path
# scripts/ is not a package — make style_lesson_panel + the runtime importable.
sys.path.insert(0, str(Path(__file__).resolve().parent))
import _pipeline_runtime # noqa: E402 — durable runtime (X16); scripts/ on sys.path
from legal_mcp import config # noqa: E402
from legal_mcp.services import db # noqa: E402
from legal_mcp.tools.documents import document_upload_training # noqa: E402
from legal_mcp.tools.workflow import ingest_final_version # noqa: E402
def _resolve_final_path(case_number: str) -> str | None:
"""The canonical final saved by /final/upload, with a graceful fallback."""
export_dir = config.find_case_dir(case_number) / "exports"
canonical = export_dir / f"סופי-{case_number}.docx"
if canonical.exists():
return str(canonical)
cands = sorted(export_dir.glob("סופי-*.docx"))
return str(cands[0]) if cands else None
async def _has_style_corpus(decision_number: str) -> bool:
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT 1 FROM style_corpus WHERE decision_number = $1 LIMIT 1",
decision_number,
)
return bool(row)
async def _latest_pair_status(case_id) -> str | None:
pool = await db.get_pool()
async with pool.acquire() as conn:
return await conn.fetchval(
"SELECT status FROM draft_final_pairs WHERE case_id = $1 "
"ORDER BY created_at DESC LIMIT 1",
case_id,
)
async def main(args: argparse.Namespace) -> int:
case_number = args.case
case = await db.get_case_by_number(case_number)
if not case:
print(f"✗ תיק {case_number} לא נמצא")
return 1
final_path = _resolve_final_path(case_number)
if not final_path:
print(f"✗ לא נמצא קובץ סופי ל-{case_number} (העלה דרך 'העלאת החלטה סופית של היו\"ר')")
return 1
print(f"final: {final_path}\n")
# The 3 steps as durable nodes (X16 / INV-DUR1) — shared runtime with
# final_halacha (scripts/_pipeline_runtime.py). A crash/OOM in the long style
# panel [3] resumes from [3] instead of re-paying the Opus distillation [1].
async def step_ingest(results: dict) -> dict:
# [1] distillation (Opus) — skip if already analyzed (idempotent; --force to redo).
status = await _latest_pair_status(case["id"])
if status == "analyzed" and not args.force:
print("[1/3] ingest_final_version — דולג (הזוג כבר analyzed; --force לחידוש)")
return {"ingest": "skipped:analyzed"}
print("[1/3] ingest_final_version — דיסטילציית טיוטה↔סופי…", flush=True)
raw = await ingest_final_version(case_number, file_path=final_path)
try:
env = json.loads(raw)
except Exception:
print(f" (ingest returned: {raw[:200]})")
return {"ingest": "unparsed"}
if env.get("status") == "error": # fatal — halt (resume retries)
raise RuntimeError(f"ingest_final_version failed: {env.get('message')}")
d = env.get("data", {})
ds = d.get("diff_stats", {})
print(f" ✓ change {ds.get('change_percent')}% · lessons {d.get('lessons_count')} "
f"· new_expr {d.get('new_expressions')}")
# Surface (do not swallow) a failed precedent-corpus copy so the final
# does not silently miss the citable internal_committee library.
if d.get("internal_corpus_ingested") is False:
print(f" ⚠️ קורפוס-פסיקה: ההעתק הפנימי (internal_committee) לא נוצר — "
f"{d.get('internal_corpus_error', 'סיבה לא ידועה')}", flush=True)
return {"ingest": "done"}
async def step_enroll(results: dict) -> dict:
# [2] enroll into style_corpus (idempotent) — lessons need a corpus_id.
print("[2/3] רישום לקורפוס-הסגנון (idempotent)…", flush=True)
if await _has_style_corpus(case_number):
print(" ✓ כבר רשום בקורפוס-הסגנון")
return {"enroll": "exists"}
r = await document_upload_training(
final_path,
decision_number=case_number,
title=f"החלטה סופית — {case.get('proceeding_type', '')} {case_number}".strip(),
practice_area=case.get("practice_area") or "appeals_committee",
appeal_subtype=case.get("appeal_subtype") or "",
)
try:
print(f" ✓ corpus_id {json.loads(r).get('data', {}).get('corpus_id')}")
except Exception:
print(f" (training upload returned: {r[:160]})")
return {"enroll": "done"}
async def step_panel(results: dict) -> dict:
# [3] two-judge style panel (DeepSeek + Gemini) — the long step durability protects.
apply = not args.dry_run
print(f"[3/3] פאנל-סגנון דו-סוכני (DeepSeek+Gemini) {'--apply' if apply else '(dry-run)'}",
flush=True)
import style_lesson_panel as slp
rc = await slp.main(Namespace(
case=case_number, pair_id=None, apply=apply, limit=0, concurrency=4,
))
return {"panel_rc": rc or 0}
steps = [
_pipeline_runtime.Step("ingest_final_version", step_ingest),
_pipeline_runtime.Step("enroll_style_corpus", step_enroll),
_pipeline_runtime.Step("style_panel", step_panel),
]
checkpoint_db = config.DATA_DIR / "checkpoints" / "learning.sqlite"
thread_id = f"learning:{case_number}" + (":dryrun" if args.dry_run else "")
try:
results = await _pipeline_runtime.run_pipeline(
steps,
thread_id=thread_id,
checkpoint_db=checkpoint_db,
fresh=bool(args.fresh) or args.dry_run,
)
except Exception as e: # fatal step (e.g. ingest error) — clean non-zero exit
print(f"\n✗ pipeline-למידה נכשל: {e}")
# Stamp the explicit FAILURE outcome on the pair so the case shows why (a
# crash otherwise leaves status='final_received' — indistinguishable from
# never-run). Skipped in dry-run. Surfaced, never swallowed.
if not args.dry_run:
try:
await db.set_learning_run_outcome(case["id"], "failed", error=str(e))
except Exception as stamp_err:
print(f" ⚠️ לא ניתן לחתום תוצאת-כישלון: {stamp_err}")
return 1
print("\n✓ pipeline-למידה הושלם" + (" (dry-run)" if args.dry_run else ""))
if not args.dry_run:
try:
await db.set_learning_run_outcome(case["id"], "succeeded", steps=results)
except Exception as stamp_err:
print(f" ⚠️ לא ניתן לחתום תוצאת-הצלחה: {stamp_err}")
return int(results.get("panel_rc", 0) or 0)
if __name__ == "__main__":
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--case", required=True, help="case_number, e.g. 8126-03-25")
ap.add_argument("--dry-run", dest="dry_run", action="store_true",
help="run the chain but the style panel in dry-run (no decision_lesson writes)")
ap.add_argument("--force", action="store_true",
help="re-run ingest_final_version even if the pair is already analyzed")
ap.add_argument("--fresh", action="store_true",
help="ignore any incomplete checkpoint and run from step [1] "
"(default: auto-resume an interrupted run; X16/INV-DUR1)")
raise SystemExit(asyncio.run(main(ap.parse_args())))