"""Durable execution runtime for the local one-shot pipelines (INV-DUR1 / X16). Wraps an ordered list of named async steps in a LangGraph linear ``StateGraph`` with a SQLite checkpointer, so a crash / OOM / kill resumes from the last COMPLETED step instead of re-running the whole pipeline (idempotency makes a re-run *safe*; durability makes it *not pay twice*). Shared by ``final_halacha_pipeline.py`` and ``final_learning_pipeline.py`` — one implementation, not one-per-script (G2). Graceful degradation: if ``langgraph`` is not installed (e.g. the shared venv hasn't been updated yet), the steps run LINEARLY — exactly as before — with a warning. The production button (run-halacha / run-learning, driven by Hermes) never breaks waiting on the dependency; it simply gains durable resume once ``langgraph`` + ``langgraph-checkpoint-sqlite`` are present. Scope (X16 §1): LangGraph is used ONLY as the internal engine of these local scripts — never as an agent-platform orchestrator (that would create a parallel path to Paperclip, breaking G2/G12). HITL stays with the chair gates / Paperclip. A "step" is ``Step(name, run)`` where ``run`` is an async callable taking the accumulated results dict and returning a dict to merge into it (typically ``{: }``). The step's real side-effects (DB writes, the LLM panel) happen inside ``run``; LangGraph checkpoints *that the node finished* so a resume skips it. """ from __future__ import annotations import logging from dataclasses import dataclass from pathlib import Path from typing import Annotated, Any, Awaitable, Callable, TypedDict logger = logging.getLogger(__name__) StepFn = Callable[[dict], Awaitable[dict]] @dataclass(frozen=True) class Step: name: str run: StepFn def _merge(a: dict, b: dict) -> dict: return {**a, **b} async def _run_linear(steps: list[Step]) -> dict: """Fallback: run steps in order with no checkpointing (pre-X16 behaviour).""" results: dict[str, Any] = {} for step in steps: out = await step.run(results) if out: results.update(out) return results async def run_pipeline( steps: list[Step], *, thread_id: str, checkpoint_db: str | Path, resume: bool = True, fresh: bool = False, ) -> dict: """Run ``steps`` in order with durable checkpointing keyed by ``thread_id``. * A brand-new ``thread_id`` (or ``fresh=True``) runs from the first step. * An INCOMPLETE thread (a prior run crashed mid-way) is RESUMED — completed steps are skipped, execution continues from the failed step. * A COMPLETED thread re-run (idempotent re-extraction) starts fresh — the stale checkpoint is cleared first so step-accumulators don't double-count. Returns the accumulated results dict (``{step_name: , ...}``). """ try: from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.graph import END, START, StateGraph except Exception as e: # noqa: BLE001 — any import failure → safe linear path logger.warning( "langgraph unavailable (%s) — running %d steps LINEARLY without " "durable checkpointing (X16/INV-DUR1 inactive; install langgraph + " "langgraph-checkpoint-sqlite to enable resume).", e, len(steps), ) return await _run_linear(steps) class State(TypedDict): results: Annotated[dict, _merge] def _make_node(step: Step): async def _node(state: State) -> dict: out = await step.run(state.get("results", {})) return {"results": out or {}} return _node graph = StateGraph(State) prev = START for step in steps: graph.add_node(step.name, _make_node(step)) graph.add_edge(prev, step.name) prev = step.name graph.add_edge(prev, END) checkpoint_db = Path(checkpoint_db) checkpoint_db.parent.mkdir(parents=True, exist_ok=True) config = {"configurable": {"thread_id": thread_id}} async with AsyncSqliteSaver.from_conn_string(str(checkpoint_db)) as saver: app = graph.compile(checkpointer=saver) snapshot = await app.aget_state(config) ran = (snapshot.values or {}).get("results", {}) if snapshot else {} incomplete = bool(ran) and tuple(snapshot.next or ()) != () if not fresh and incomplete: logger.info( "pipeline %s — resuming from %s (%d step(s) already done: %s)", thread_id, snapshot.next, len(ran), ", ".join(ran), ) final = await app.ainvoke(None, config) else: if snapshot and (snapshot.values or {}): # stale/completed checkpoint — clear so this is a true fresh run. await saver.adelete_thread(thread_id) if fresh and ran: logger.info("pipeline %s — --fresh: cleared prior checkpoint", thread_id) final = await app.ainvoke({"results": {}}, config) return (final or {}).get("results", {})