feat(pipeline): durable execution for final_halacha via LangGraph (P0, X16/INV-DUR1, #114)
scripts/_pipeline_runtime.py — runtime עמידות משותף: עוטף רשימת-צעדים async ב-LangGraph StateGraph ליניארי עם AsyncSqliteSaver (checkpoint לכל צעד). קריסה/OOM ממשיכה מהצעד שנכשל במקום להריץ הכל מחדש. degradation חיננית: ללא langgraph → ריצה ליניארית כמו קודם (הכפתור לא נשבר). מימוש אחד לשני הפייפליינים (G2). final_halacha_pipeline.py — 4 הצעדים ([0]extract [1]citations [2]corroboration [3]panel) רצים דרך ה-runtime. CLI זהה + --fresh (ברירת-מחדל auto-resume). thread יציב לכל תיק; dry-run = preview נפרד (תמיד fresh). קריסה בפאנל [3] → resume מ-[3] (steps 0-2 שמורים). pyproject: extra "durable" (langgraph + langgraph-checkpoint-sqlite) — host-only, optional. data/checkpoints/ ב-.gitignore. גבול (X16 §1): LangGraph רק כמנוע-פנימי של הסקריפט — לא orchestrator (לא מסלול מקביל ל-Paperclip; G2/G12). #108 (atomic extract) קדם לזה כתנאי. אימות: test_pipeline_runtime.py — עם langgraph (venv-זמני): 3 passed (resume מדלג צעדים שהושלמו · fresh מריץ-מחדש · linear). בלי langgraph (venv משותף): 1 passed + 2 skipped (degradation). final_halacha מתקמפל ומיובא נקי בשני המצבים. הרצה end-to-end על הפייפליין החי (DB+LLM) — לאחר `pip install -e ".[durable]"` בעץ הראשי. Invariants: INV-DUR1 (עמידות), G2 (runtime יחיד), G3 (idempotency מחוזק). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
130
scripts/_pipeline_runtime.py
Normal file
130
scripts/_pipeline_runtime.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""Durable execution runtime for the local one-shot pipelines (INV-DUR1 / X16).
|
||||
|
||||
Wraps an ordered list of named async steps in a LangGraph linear ``StateGraph``
|
||||
with a SQLite checkpointer, so a crash / OOM / kill resumes from the last
|
||||
COMPLETED step instead of re-running the whole pipeline (idempotency makes a
|
||||
re-run *safe*; durability makes it *not pay twice*).
|
||||
|
||||
Shared by ``final_halacha_pipeline.py`` and ``final_learning_pipeline.py`` — one
|
||||
implementation, not one-per-script (G2).
|
||||
|
||||
Graceful degradation: if ``langgraph`` is not installed (e.g. the shared venv
|
||||
hasn't been updated yet), the steps run LINEARLY — exactly as before — with a
|
||||
warning. The production button (run-halacha / run-learning, driven by Hermes)
|
||||
never breaks waiting on the dependency; it simply gains durable resume once
|
||||
``langgraph`` + ``langgraph-checkpoint-sqlite`` are present.
|
||||
|
||||
Scope (X16 §1): LangGraph is used ONLY as the internal engine of these local
|
||||
scripts — never as an agent-platform orchestrator (that would create a parallel
|
||||
path to Paperclip, breaking G2/G12). HITL stays with the chair gates / Paperclip.
|
||||
|
||||
A "step" is ``Step(name, run)`` where ``run`` is an async callable taking the
|
||||
accumulated results dict and returning a dict to merge into it (typically
|
||||
``{<something>: <summary>}``). The step's real side-effects (DB writes, the LLM
|
||||
panel) happen inside ``run``; LangGraph checkpoints *that the node finished* so a
|
||||
resume skips it.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Any, Awaitable, Callable, TypedDict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
StepFn = Callable[[dict], Awaitable[dict]]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Step:
|
||||
name: str
|
||||
run: StepFn
|
||||
|
||||
|
||||
def _merge(a: dict, b: dict) -> dict:
|
||||
return {**a, **b}
|
||||
|
||||
|
||||
async def _run_linear(steps: list[Step]) -> dict:
|
||||
"""Fallback: run steps in order with no checkpointing (pre-X16 behaviour)."""
|
||||
results: dict[str, Any] = {}
|
||||
for step in steps:
|
||||
out = await step.run(results)
|
||||
if out:
|
||||
results.update(out)
|
||||
return results
|
||||
|
||||
|
||||
async def run_pipeline(
|
||||
steps: list[Step],
|
||||
*,
|
||||
thread_id: str,
|
||||
checkpoint_db: str | Path,
|
||||
resume: bool = True,
|
||||
fresh: bool = False,
|
||||
) -> dict:
|
||||
"""Run ``steps`` in order with durable checkpointing keyed by ``thread_id``.
|
||||
|
||||
* A brand-new ``thread_id`` (or ``fresh=True``) runs from the first step.
|
||||
* An INCOMPLETE thread (a prior run crashed mid-way) is RESUMED — completed
|
||||
steps are skipped, execution continues from the failed step.
|
||||
* A COMPLETED thread re-run (idempotent re-extraction) starts fresh — the
|
||||
stale checkpoint is cleared first so step-accumulators don't double-count.
|
||||
|
||||
Returns the accumulated results dict (``{step_name: <return>, ...}``).
|
||||
"""
|
||||
try:
|
||||
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
|
||||
from langgraph.graph import END, START, StateGraph
|
||||
except Exception as e: # noqa: BLE001 — any import failure → safe linear path
|
||||
logger.warning(
|
||||
"langgraph unavailable (%s) — running %d steps LINEARLY without "
|
||||
"durable checkpointing (X16/INV-DUR1 inactive; install langgraph + "
|
||||
"langgraph-checkpoint-sqlite to enable resume).",
|
||||
e, len(steps),
|
||||
)
|
||||
return await _run_linear(steps)
|
||||
|
||||
class State(TypedDict):
|
||||
results: Annotated[dict, _merge]
|
||||
|
||||
def _make_node(step: Step):
|
||||
async def _node(state: State) -> dict:
|
||||
out = await step.run(state.get("results", {}))
|
||||
return {"results": out or {}}
|
||||
return _node
|
||||
|
||||
graph = StateGraph(State)
|
||||
prev = START
|
||||
for step in steps:
|
||||
graph.add_node(step.name, _make_node(step))
|
||||
graph.add_edge(prev, step.name)
|
||||
prev = step.name
|
||||
graph.add_edge(prev, END)
|
||||
|
||||
checkpoint_db = Path(checkpoint_db)
|
||||
checkpoint_db.parent.mkdir(parents=True, exist_ok=True)
|
||||
config = {"configurable": {"thread_id": thread_id}}
|
||||
|
||||
async with AsyncSqliteSaver.from_conn_string(str(checkpoint_db)) as saver:
|
||||
app = graph.compile(checkpointer=saver)
|
||||
snapshot = await app.aget_state(config)
|
||||
ran = (snapshot.values or {}).get("results", {}) if snapshot else {}
|
||||
incomplete = bool(ran) and tuple(snapshot.next or ()) != ()
|
||||
|
||||
if not fresh and incomplete:
|
||||
logger.info(
|
||||
"pipeline %s — resuming from %s (%d step(s) already done: %s)",
|
||||
thread_id, snapshot.next, len(ran), ", ".join(ran),
|
||||
)
|
||||
final = await app.ainvoke(None, config)
|
||||
else:
|
||||
if snapshot and (snapshot.values or {}):
|
||||
# stale/completed checkpoint — clear so this is a true fresh run.
|
||||
await saver.adelete_thread(thread_id)
|
||||
if fresh and ran:
|
||||
logger.info("pipeline %s — --fresh: cleared prior checkpoint", thread_id)
|
||||
final = await app.ainvoke({"results": {}}, config)
|
||||
|
||||
return (final or {}).get("results", {})
|
||||
Reference in New Issue
Block a user