feat(halacha): crash-safe incremental extraction + resume (A + resume)

Halacha extraction held ALL chunk results in memory and stored once at the very
end — a crash/interrupt mid-run (e.g. the 2026-05-31 freeze) lost everything and
re-paid the full LLM cost on retry.

Now each chunk's halachot are stored AND the chunk is checkpointed
(precedent_chunks.halacha_extracted_at) the moment it finishes:

- V25 schema: precedent_chunks.halacha_extracted_at (per-chunk checkpoint).
- db.store_halachot_for_chunk: atomic per-chunk insert (halacha_index continues
  from MAX, caller serializes via an in-process store-lock) + checkpoint mark.
- db.reset_halacha_extraction (force) / mark_all_chunks_extracted (legacy backfill).
- _extract_impl rewritten: resume by default (skip checkpointed chunks; failed
  chunks stay pending and are retried; status stays 'processing' until all done);
  force=True wipes + redoes all. reextract_halachot passes force=True; the queue
  drain (process_pending) resumes by default.
- Legacy guard: a pre-V25 precedent (halachot exist, no checkpoints) is
  backfilled and treated as complete — never re-extracted (would duplicate).

Verified on 9002-24 (55 halachot, legacy): resume → legacy-backfill, NO
duplication (stays 55), all chunks checkpointed. Index continuation: store at
55,56 after max 54, no collision. Tracks #72.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 21:27:46 +00:00
parent 6183e24316
commit 8e4ea23882
3 changed files with 222 additions and 93 deletions

View File

@@ -342,9 +342,14 @@ async def _extract_chunk(
return [], False
async def extract(case_law_id: UUID | str) -> dict:
async def extract(case_law_id: UUID | str, force: bool = False) -> dict:
"""Extract halachot from an uploaded precedent — globally serialized.
``force=False`` (default) RESUMES: chunks already extracted (checkpointed)
are skipped, so a crash/interrupt never loses completed work or re-pays for
it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all
(used by explicit re-extraction).
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
across ALL processes (agent retries + batch ``process_pending`` spawn
independent OS drivers; an in-process Semaphore can't see them). If another
@@ -374,7 +379,7 @@ async def extract(case_law_id: UUID | str) -> dict:
"case_law_id": str(case_law_id),
}
try:
return await _extract_impl(case_law_id)
return await _extract_impl(case_law_id, force=force)
finally:
await lock_conn.fetchval(
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
@@ -383,11 +388,14 @@ async def extract(case_law_id: UUID | str) -> dict:
await pool.release(lock_conn)
async def _extract_impl(case_law_id: UUID) -> dict:
async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict:
"""Core extraction (caller holds the global advisory lock for the duration).
Idempotent: replaces any existing halachot for this case_law_id.
All inserted rows start as ``review_status='pending_review'``.
Crash-safe + resumable: each chunk's halachot are stored AND the chunk is
checkpointed (``precedent_chunks.halacha_extracted_at``) the moment it
finishes. A crash/interrupt loses at most the in-flight chunk; a re-run
resumes — already-done chunks are skipped, failed/pending chunks retried.
``force=True`` wipes prior halachot + checkpoints and re-extracts all.
"""
record = await db.get_case_law(case_law_id)
if not record:
@@ -415,111 +423,135 @@ async def _extract_impl(case_law_id: UUID) -> dict:
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "no_chunks", "extracted": 0, "stored": 0}
await db.set_case_law_halacha_status(case_law_id, "processing")
await db.delete_halachot(case_law_id)
# force = clean slate; otherwise resume (skip already-checkpointed chunks).
if force:
await db.reset_halacha_extraction(case_law_id)
for c in chunks:
c["halacha_extracted_at"] = None
await db.set_case_law_halacha_status(case_law_id, "processing")
pending = [c for c in chunks if c.get("halacha_extracted_at") is None]
# Legacy guard: a precedent extracted before V25 has halachot but NO chunk
# checkpoints. Re-extracting (append-per-chunk) would DUPLICATE them. If
# nothing is checkpointed yet but halachot already exist, backfill the
# checkpoints and treat as complete instead of re-extracting.
if not force and len(pending) == len(chunks):
already = await db.list_halachot(case_law_id=case_law_id, limit=1)
if already:
await db.mark_all_chunks_extracted(case_law_id)
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
await db.set_case_law_halacha_status(case_law_id, "completed")
logger.info(
"halacha_extractor: case_law=%s legacy-backfill — %d existing "
"halachot, checkpoints backfilled (no re-extract).",
case_law_id, total,
)
return {"status": "completed", "extracted": total, "stored": total,
"legacy_backfill": True, "total_chunks": len(chunks)}
if not pending:
# Resume found nothing left — every chunk already extracted.
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "extracted": total, "stored": total,
"resumed": True, "total_chunks": len(chunks)}
full_text = record.get("full_text") or ""
citation = record.get("case_number", "")
court = record.get("court", "")
date_str = str(record.get("date") or "")
context = f"מקור: {citation}{court}, {date_str}"
idx_by_id = {c["id"]: i for i, c in enumerate(chunks)}
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
store_lock = asyncio.Lock() # serialize per-chunk stores (index continuity)
stored_total = 0
failed_chunks = 0
async def _bounded(idx: int, chunk_row: dict) -> tuple[list[dict], bool]:
async def _process(chunk_row: dict) -> None:
nonlocal stored_total, failed_chunks
async with sem:
return await _extract_chunk(
items, ok = await _extract_chunk(
chunk_row["content"], chunk_row["section_type"],
idx, len(chunks), context, is_binding,
idx_by_id[chunk_row["id"]], len(chunks), context, is_binding,
)
if not ok:
failed_chunks += 1 # leave chunk un-checkpointed → retried on resume
return
cleaned: list[dict] = []
for raw in items:
coerced = _coerce_halacha(raw, is_binding=is_binding)
if coerced is None:
continue
coerced["quote_verified"] = _verify_quote(
coerced["supporting_quote"], full_text,
)
cleaned.append(coerced)
if cleaned:
embed_inputs = [
f"{h['rule_statement']}{h['reasoning_summary']}".strip("")
for h in cleaned
]
try:
vectors = await embeddings.embed_texts(embed_inputs, input_type="document")
except Exception as e:
logger.error("halacha_extractor: embeddings failed: %s", e)
vectors = [None] * len(cleaned)
for h, vec in zip(cleaned, vectors):
h["embedding"] = vec
# Store this chunk's halachot AND checkpoint the chunk, atomically.
async with store_lock:
stored_total += await db.store_halachot_for_chunk(
case_law_id, chunk_row["id"], cleaned,
)
chunk_results = await asyncio.gather(
*[_bounded(i, c) for i, c in enumerate(chunks)]
)
raw_halachot: list[dict] = []
failed_chunks = 0
for items, ok in chunk_results:
raw_halachot.extend(items)
if not ok:
failed_chunks += 1
await asyncio.gather(*[_process(c) for c in pending])
# If most chunks failed (rate limit storm, claude_session crash, etc.)
# do NOT touch the DB status — leave it 'processing' so the caller can
# retry without the request falling out of the queue. The caller
# (`process_pending_extractions`) is responsible for either retrying or
# finalising the status as 'failed' after retries are exhausted. This
# is the bug that produced 317/10's silent `no_halachot` after a
# 129-chunk neighbour saturated the API.
failure_rate = failed_chunks / len(chunks) if chunks else 0
if failure_rate >= EXTRACTION_FAILURE_THRESHOLD and not raw_halachot:
logger.error(
"halacha_extractor: case_law=%s extraction_failed — "
"%d/%d chunks failed (rate=%.0f%%), no halachot retrieved. "
"DB status left as 'processing' for caller-level retry.",
case_law_id, failed_chunks, len(chunks), failure_rate * 100,
# Decide final status from what's LEFT (re-read checkpoints).
after = await db.list_precedent_chunks(case_law_id, section_types=EXTRACTABLE_SECTIONS)
if not after:
after = await db.list_precedent_chunks(case_law_id)
still_pending = sum(1 for c in after if c.get("halacha_extracted_at") is None)
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
if still_pending:
# Some chunks failed this run. Leave status 'processing' so a resume
# continues them (no progress is lost — done chunks are checkpointed).
if total == 0 and failed_chunks >= len(pending) * EXTRACTION_FAILURE_THRESHOLD:
logger.error(
"halacha_extractor: case_law=%s extraction_failed — %d/%d pending "
"chunks failed, 0 stored. status left 'processing' for retry.",
case_law_id, failed_chunks, len(pending),
)
return {"status": "extraction_failed", "extracted": 0, "stored": 0,
"failed_chunks": failed_chunks, "pending_chunks": still_pending,
"total_chunks": len(chunks)}
logger.warning(
"halacha_extractor: case_law=%s partial — %d chunks still pending, "
"%d halachot stored so far. status 'processing' (resume to finish).",
case_law_id, still_pending, total,
)
return {
"status": "extraction_failed",
"extracted": 0,
"stored": 0,
"failed_chunks": failed_chunks,
"total_chunks": len(chunks),
}
return {"status": "partial", "extracted": total, "stored": stored_total,
"pending_chunks": still_pending, "total_chunks": len(chunks)}
if not raw_halachot:
await db.set_case_law_halacha_status(case_law_id, "completed")
return {
"status": "no_halachot",
"extracted": 0,
"stored": 0,
"failed_chunks": failed_chunks,
"total_chunks": len(chunks),
}
# Validate against the full text of the precedent for the quote check.
full_text = record.get("full_text") or ""
cleaned: list[dict] = []
for raw in raw_halachot:
coerced = _coerce_halacha(raw, is_binding=is_binding)
if coerced is None:
continue
coerced["quote_verified"] = _verify_quote(
coerced["supporting_quote"], full_text,
)
cleaned.append(coerced)
if not cleaned:
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "no_valid_halachot", "extracted": len(raw_halachot), "stored": 0}
# Embed rule_statement + reasoning_summary so semantic search hits the
# rule directly rather than the surrounding chunk centroid.
embed_inputs = [
f"{h['rule_statement']}{h['reasoning_summary']}".strip("")
for h in cleaned
]
try:
vectors = await embeddings.embed_texts(embed_inputs, input_type="document")
except Exception as e:
logger.error("halacha_extractor: embeddings failed: %s", e)
vectors = [None] * len(cleaned)
for halacha, vec in zip(cleaned, vectors):
halacha["embedding"] = vec
stored = await db.store_halachot(case_law_id, cleaned)
verified = sum(1 for h in cleaned if h["quote_verified"])
# All chunks done.
stored = total
verified = sum(1 for h in await db.list_halachot(case_law_id=case_law_id, limit=10_000)
if h.get("quote_verified"))
await db.set_case_law_halacha_status(case_law_id, "completed")
logger.info(
"halacha_extractor: case_law=%s extracted=%d cleaned=%d verified=%d stored=%d",
case_law_id, len(raw_halachot), len(cleaned), verified, stored,
"halacha_extractor: case_law=%s completed — %d halachot stored "
"(%d new this run), %d quote-verified, %d chunks",
case_law_id, total, stored_total, verified, len(chunks),
)
return {
"status": "completed",
"extracted": len(raw_halachot),
"valid": len(cleaned),
"extracted": total,
"verified": verified,
"stored": stored,
"stored_this_run": stored_total,
"total_chunks": len(chunks),
}