Files
legal-ai/scripts/_pipeline_runtime.py
Chaim e7d8b24d7c feat(pipeline): durable execution for final_halacha via LangGraph (P0, X16/INV-DUR1, #114)
scripts/_pipeline_runtime.py — runtime עמידות משותף: עוטף רשימת-צעדים async ב-LangGraph
StateGraph ליניארי עם AsyncSqliteSaver (checkpoint לכל צעד). קריסה/OOM ממשיכה מהצעד
שנכשל במקום להריץ הכל מחדש. degradation חיננית: ללא langgraph → ריצה ליניארית כמו קודם
(הכפתור לא נשבר). מימוש אחד לשני הפייפליינים (G2).

final_halacha_pipeline.py — 4 הצעדים ([0]extract [1]citations [2]corroboration [3]panel)
רצים דרך ה-runtime. CLI זהה + --fresh (ברירת-מחדל auto-resume). thread יציב לכל תיק;
dry-run = preview נפרד (תמיד fresh). קריסה בפאנל [3] → resume מ-[3] (steps 0-2 שמורים).

pyproject: extra "durable" (langgraph + langgraph-checkpoint-sqlite) — host-only,
optional. data/checkpoints/ ב-.gitignore.

גבול (X16 §1): LangGraph רק כמנוע-פנימי של הסקריפט — לא orchestrator (לא מסלול מקביל
ל-Paperclip; G2/G12). #108 (atomic extract) קדם לזה כתנאי.

אימות: test_pipeline_runtime.py — עם langgraph (venv-זמני): 3 passed (resume מדלג צעדים
שהושלמו · fresh מריץ-מחדש · linear). בלי langgraph (venv משותף): 1 passed + 2 skipped
(degradation). final_halacha מתקמפל ומיובא נקי בשני המצבים. הרצה end-to-end על הפייפליין
החי (DB+LLM) — לאחר `pip install -e ".[durable]"` בעץ הראשי.

Invariants: INV-DUR1 (עמידות), G2 (runtime יחיד), G3 (idempotency מחוזק).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 09:52:35 +00:00

131 lines
5.0 KiB
Python

"""Durable execution runtime for the local one-shot pipelines (INV-DUR1 / X16).
Wraps an ordered list of named async steps in a LangGraph linear ``StateGraph``
with a SQLite checkpointer, so a crash / OOM / kill resumes from the last
COMPLETED step instead of re-running the whole pipeline (idempotency makes a
re-run *safe*; durability makes it *not pay twice*).
Shared by ``final_halacha_pipeline.py`` and ``final_learning_pipeline.py`` — one
implementation, not one-per-script (G2).
Graceful degradation: if ``langgraph`` is not installed (e.g. the shared venv
hasn't been updated yet), the steps run LINEARLY — exactly as before — with a
warning. The production button (run-halacha / run-learning, driven by Hermes)
never breaks waiting on the dependency; it simply gains durable resume once
``langgraph`` + ``langgraph-checkpoint-sqlite`` are present.
Scope (X16 §1): LangGraph is used ONLY as the internal engine of these local
scripts — never as an agent-platform orchestrator (that would create a parallel
path to Paperclip, breaking G2/G12). HITL stays with the chair gates / Paperclip.
A "step" is ``Step(name, run)`` where ``run`` is an async callable taking the
accumulated results dict and returning a dict to merge into it (typically
``{<something>: <summary>}``). The step's real side-effects (DB writes, the LLM
panel) happen inside ``run``; LangGraph checkpoints *that the node finished* so a
resume skips it.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Any, Awaitable, Callable, TypedDict
logger = logging.getLogger(__name__)
StepFn = Callable[[dict], Awaitable[dict]]
@dataclass(frozen=True)
class Step:
name: str
run: StepFn
def _merge(a: dict, b: dict) -> dict:
return {**a, **b}
async def _run_linear(steps: list[Step]) -> dict:
"""Fallback: run steps in order with no checkpointing (pre-X16 behaviour)."""
results: dict[str, Any] = {}
for step in steps:
out = await step.run(results)
if out:
results.update(out)
return results
async def run_pipeline(
steps: list[Step],
*,
thread_id: str,
checkpoint_db: str | Path,
resume: bool = True,
fresh: bool = False,
) -> dict:
"""Run ``steps`` in order with durable checkpointing keyed by ``thread_id``.
* A brand-new ``thread_id`` (or ``fresh=True``) runs from the first step.
* An INCOMPLETE thread (a prior run crashed mid-way) is RESUMED — completed
steps are skipped, execution continues from the failed step.
* A COMPLETED thread re-run (idempotent re-extraction) starts fresh — the
stale checkpoint is cleared first so step-accumulators don't double-count.
Returns the accumulated results dict (``{step_name: <return>, ...}``).
"""
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.graph import END, START, StateGraph
except Exception as e: # noqa: BLE001 — any import failure → safe linear path
logger.warning(
"langgraph unavailable (%s) — running %d steps LINEARLY without "
"durable checkpointing (X16/INV-DUR1 inactive; install langgraph + "
"langgraph-checkpoint-sqlite to enable resume).",
e, len(steps),
)
return await _run_linear(steps)
class State(TypedDict):
results: Annotated[dict, _merge]
def _make_node(step: Step):
async def _node(state: State) -> dict:
out = await step.run(state.get("results", {}))
return {"results": out or {}}
return _node
graph = StateGraph(State)
prev = START
for step in steps:
graph.add_node(step.name, _make_node(step))
graph.add_edge(prev, step.name)
prev = step.name
graph.add_edge(prev, END)
checkpoint_db = Path(checkpoint_db)
checkpoint_db.parent.mkdir(parents=True, exist_ok=True)
config = {"configurable": {"thread_id": thread_id}}
async with AsyncSqliteSaver.from_conn_string(str(checkpoint_db)) as saver:
app = graph.compile(checkpointer=saver)
snapshot = await app.aget_state(config)
ran = (snapshot.values or {}).get("results", {}) if snapshot else {}
incomplete = bool(ran) and tuple(snapshot.next or ()) != ()
if not fresh and incomplete:
logger.info(
"pipeline %s — resuming from %s (%d step(s) already done: %s)",
thread_id, snapshot.next, len(ran), ", ".join(ran),
)
final = await app.ainvoke(None, config)
else:
if snapshot and (snapshot.values or {}):
# stale/completed checkpoint — clear so this is a true fresh run.
await saver.adelete_thread(thread_id)
if fresh and ran:
logger.info("pipeline %s — --fresh: cleared prior checkpoint", thread_id)
final = await app.ainvoke({"results": {}}, config)
return (final or {}).get("results", {})