feat(pipeline): durable execution for final_learning via shared runtime (P1, X16/INV-DUR1, #115)
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s
מחיל את scripts/_pipeline_runtime.py (מ-P0) על final_learning_pipeline: 3 הצעדים
([1]ingest/Opus-distillation [2]enroll-style-corpus [3]style-panel) רצים דרך אותו
runtime עמידות — מימוש אחד לשני הפייפליינים (G2), לא מימוש מקביל.
קריסה/OOM בפאנל-הסגנון [3] ממשיכה מ-[3] במקום לשלם שוב על דיסטילציית-ה-Opus [1]
(היקרה). thread יציב לכל תיק (learning:{case}); dry-run = preview נפרד. CLI זהה +
--fresh. שגיאת ingest קריטית → raise → halt + clean non-zero exit (resume מנסה שוב).
degradation חיננית כמו ב-P0 (ללא langgraph → ליניארי).
אימות: py_compile OK; מיובא נקי ב-venv המשותף (langgraph נעדר, lazy import). מנגנון
ה-runtime עצמו מכוסה ב-test_pipeline_runtime.py (P0) — אותו runtime.
Invariants: INV-DUR1 (עמידות), G2 (runtime יחיד), G3 (idempotency).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,6 +17,13 @@ Steps:
|
||||
The fold into SKILL.md / legal-decision-lessons.md stays a manual chair gate.
|
||||
Local-only. Idempotent — safe to re-run.
|
||||
|
||||
Durable (X16 / INV-DUR1): the 3 steps run through scripts/_pipeline_runtime.py
|
||||
(shared with final_halacha) with a SQLite checkpoint per case
|
||||
(data/checkpoints/learning.sqlite). A crash/OOM in the long style panel [3]
|
||||
RESUMES from [3] instead of re-paying the Opus distillation [1]. Default =
|
||||
auto-resume; ``--fresh`` forces a clean run. Needs the host extra
|
||||
``pip install -e ".[durable]"``; without it the steps run linearly (as before).
|
||||
|
||||
cd ~/legal-ai/mcp-server
|
||||
.venv/bin/python ../scripts/final_learning_pipeline.py --case 8126-03-25
|
||||
"""
|
||||
@@ -29,9 +36,10 @@ import sys
|
||||
from argparse import Namespace
|
||||
from pathlib import Path
|
||||
|
||||
# scripts/ is not a package — make style_lesson_panel importable.
|
||||
# scripts/ is not a package — make style_lesson_panel + the runtime importable.
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
|
||||
import _pipeline_runtime # noqa: E402 — durable runtime (X16); scripts/ on sys.path
|
||||
from legal_mcp import config # noqa: E402
|
||||
from legal_mcp.services import db # noqa: E402
|
||||
from legal_mcp.tools.documents import document_upload_training # noqa: E402
|
||||
@@ -81,30 +89,37 @@ async def main(args: argparse.Namespace) -> int:
|
||||
return 1
|
||||
print(f"final: {final_path}\n")
|
||||
|
||||
# [1] distillation (Opus) — skip if already analyzed (idempotent; --force to redo)
|
||||
status = await _latest_pair_status(case["id"])
|
||||
if status == "analyzed" and not args.force:
|
||||
print(f"[1/3] ingest_final_version — דולג (הזוג כבר analyzed; --force לחידוש)")
|
||||
else:
|
||||
# The 3 steps as durable nodes (X16 / INV-DUR1) — shared runtime with
|
||||
# final_halacha (scripts/_pipeline_runtime.py). A crash/OOM in the long style
|
||||
# panel [3] resumes from [3] instead of re-paying the Opus distillation [1].
|
||||
|
||||
async def step_ingest(results: dict) -> dict:
|
||||
# [1] distillation (Opus) — skip if already analyzed (idempotent; --force to redo).
|
||||
status = await _latest_pair_status(case["id"])
|
||||
if status == "analyzed" and not args.force:
|
||||
print("[1/3] ingest_final_version — דולג (הזוג כבר analyzed; --force לחידוש)")
|
||||
return {"ingest": "skipped:analyzed"}
|
||||
print("[1/3] ingest_final_version — דיסטילציית טיוטה↔סופי…", flush=True)
|
||||
raw = await ingest_final_version(case_number, file_path=final_path)
|
||||
try:
|
||||
env = json.loads(raw)
|
||||
if env.get("status") == "error":
|
||||
print(f" ✗ {env.get('message')}")
|
||||
return 1
|
||||
d = env.get("data", {})
|
||||
ds = d.get("diff_stats", {})
|
||||
print(f" ✓ change {ds.get('change_percent')}% · lessons {d.get('lessons_count')} "
|
||||
f"· new_expr {d.get('new_expressions')}")
|
||||
except Exception:
|
||||
print(f" (ingest returned: {raw[:200]})")
|
||||
return {"ingest": "unparsed"}
|
||||
if env.get("status") == "error": # fatal — halt (resume retries)
|
||||
raise RuntimeError(f"ingest_final_version failed: {env.get('message')}")
|
||||
d = env.get("data", {})
|
||||
ds = d.get("diff_stats", {})
|
||||
print(f" ✓ change {ds.get('change_percent')}% · lessons {d.get('lessons_count')} "
|
||||
f"· new_expr {d.get('new_expressions')}")
|
||||
return {"ingest": "done"}
|
||||
|
||||
# [2] enroll into style_corpus (idempotent) — lessons need a corpus_id
|
||||
print("[2/3] רישום לקורפוס-הסגנון (idempotent)…", flush=True)
|
||||
if await _has_style_corpus(case_number):
|
||||
print(" ✓ כבר רשום בקורפוס-הסגנון")
|
||||
else:
|
||||
async def step_enroll(results: dict) -> dict:
|
||||
# [2] enroll into style_corpus (idempotent) — lessons need a corpus_id.
|
||||
print("[2/3] רישום לקורפוס-הסגנון (idempotent)…", flush=True)
|
||||
if await _has_style_corpus(case_number):
|
||||
print(" ✓ כבר רשום בקורפוס-הסגנון")
|
||||
return {"enroll": "exists"}
|
||||
r = await document_upload_training(
|
||||
final_path,
|
||||
decision_number=case_number,
|
||||
@@ -116,17 +131,38 @@ async def main(args: argparse.Namespace) -> int:
|
||||
print(f" ✓ corpus_id {json.loads(r).get('data', {}).get('corpus_id')}")
|
||||
except Exception:
|
||||
print(f" (training upload returned: {r[:160]})")
|
||||
return {"enroll": "done"}
|
||||
|
||||
# [3] two-judge style panel (DeepSeek + Gemini)
|
||||
apply = not args.dry_run
|
||||
print(f"[3/3] פאנל-סגנון דו-סוכני (DeepSeek+Gemini) {'--apply' if apply else '(dry-run)'}…",
|
||||
flush=True)
|
||||
import style_lesson_panel as slp
|
||||
rc = await slp.main(Namespace(
|
||||
case=case_number, pair_id=None, apply=apply, limit=0, concurrency=4,
|
||||
))
|
||||
async def step_panel(results: dict) -> dict:
|
||||
# [3] two-judge style panel (DeepSeek + Gemini) — the long step durability protects.
|
||||
apply = not args.dry_run
|
||||
print(f"[3/3] פאנל-סגנון דו-סוכני (DeepSeek+Gemini) {'--apply' if apply else '(dry-run)'}…",
|
||||
flush=True)
|
||||
import style_lesson_panel as slp
|
||||
rc = await slp.main(Namespace(
|
||||
case=case_number, pair_id=None, apply=apply, limit=0, concurrency=4,
|
||||
))
|
||||
return {"panel_rc": rc or 0}
|
||||
|
||||
steps = [
|
||||
_pipeline_runtime.Step("ingest_final_version", step_ingest),
|
||||
_pipeline_runtime.Step("enroll_style_corpus", step_enroll),
|
||||
_pipeline_runtime.Step("style_panel", step_panel),
|
||||
]
|
||||
checkpoint_db = config.DATA_DIR / "checkpoints" / "learning.sqlite"
|
||||
thread_id = f"learning:{case_number}" + (":dryrun" if args.dry_run else "")
|
||||
try:
|
||||
results = await _pipeline_runtime.run_pipeline(
|
||||
steps,
|
||||
thread_id=thread_id,
|
||||
checkpoint_db=checkpoint_db,
|
||||
fresh=bool(args.fresh) or args.dry_run,
|
||||
)
|
||||
except Exception as e: # fatal step (e.g. ingest error) — clean non-zero exit
|
||||
print(f"\n✗ pipeline-למידה נכשל: {e}")
|
||||
return 1
|
||||
print("\n✓ pipeline-למידה הושלם" + (" (dry-run)" if args.dry_run else ""))
|
||||
return rc or 0
|
||||
return int(results.get("panel_rc", 0) or 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -137,4 +173,7 @@ if __name__ == "__main__":
|
||||
help="run the chain but the style panel in dry-run (no decision_lesson writes)")
|
||||
ap.add_argument("--force", action="store_true",
|
||||
help="re-run ingest_final_version even if the pair is already analyzed")
|
||||
ap.add_argument("--fresh", action="store_true",
|
||||
help="ignore any incomplete checkpoint and run from step [1] "
|
||||
"(default: auto-resume an interrupted run; X16/INV-DUR1)")
|
||||
raise SystemExit(asyncio.run(main(ap.parse_args())))
|
||||
|
||||
Reference in New Issue
Block a user