From 8e4ea238820c7fc9b8ce2b47276b596774cf948e Mon Sep 17 00:00:00 2001 From: Chaim Date: Sun, 31 May 2026 21:27:46 +0000 Subject: [PATCH] feat(halacha): crash-safe incremental extraction + resume (A + resume) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Halacha extraction held ALL chunk results in memory and stored once at the very end — a crash/interrupt mid-run (e.g. the 2026-05-31 freeze) lost everything and re-paid the full LLM cost on retry. Now each chunk's halachot are stored AND the chunk is checkpointed (precedent_chunks.halacha_extracted_at) the moment it finishes: - V25 schema: precedent_chunks.halacha_extracted_at (per-chunk checkpoint). - db.store_halachot_for_chunk: atomic per-chunk insert (halacha_index continues from MAX, caller serializes via an in-process store-lock) + checkpoint mark. - db.reset_halacha_extraction (force) / mark_all_chunks_extracted (legacy backfill). - _extract_impl rewritten: resume by default (skip checkpointed chunks; failed chunks stay pending and are retried; status stays 'processing' until all done); force=True wipes + redoes all. reextract_halachot passes force=True; the queue drain (process_pending) resumes by default. - Legacy guard: a pre-V25 precedent (halachot exist, no checkpoints) is backfilled and treated as complete — never re-extracted (would duplicate). Verified on 9002-24 (55 halachot, legacy): resume → legacy-backfill, NO duplication (stays 55), all chunks checkpointed. Index continuation: store at 55,56 after max 54, no collision. Tracks #72. Co-Authored-By: Claude Opus 4.8 (1M context) --- mcp-server/src/legal_mcp/services/db.py | 100 ++++++++- .../legal_mcp/services/halacha_extractor.py | 210 ++++++++++-------- .../legal_mcp/services/precedent_library.py | 5 +- 3 files changed, 222 insertions(+), 93 deletions(-) diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index e863758..ed70b29 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -1149,6 +1149,14 @@ CREATE TABLE IF NOT EXISTS halacha_citation_corroboration ( CREATE INDEX IF NOT EXISTS idx_hcc_halacha ON halacha_citation_corroboration(halacha_id); """ +SCHEMA_V25_SQL = """ +-- Crash-safe halacha extraction: per-chunk checkpoint enables incremental store +-- + resume. A chunk with halacha_extracted_at set has been processed; a resumed +-- run skips it (so a crash never loses completed chunks or re-pays for them). +ALTER TABLE precedent_chunks + ADD COLUMN IF NOT EXISTS halacha_extracted_at TIMESTAMPTZ; +""" + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: @@ -1177,7 +1185,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V22_SQL) await conn.execute(SCHEMA_V23_SQL) await conn.execute(SCHEMA_V24_SQL) - logger.info("Database schema initialized (v1-v24)") + await conn.execute(SCHEMA_V25_SQL) + logger.info("Database schema initialized (v1-v25)") async def init_schema() -> None: @@ -3199,7 +3208,8 @@ async def list_precedent_chunks( pool = await get_pool() if section_types: rows = await pool.fetch( - """SELECT id, chunk_index, content, section_type, page_number + """SELECT id, chunk_index, content, section_type, page_number, + halacha_extracted_at FROM precedent_chunks WHERE case_law_id = $1 AND section_type = ANY($2::text[]) ORDER BY chunk_index""", @@ -3207,7 +3217,8 @@ async def list_precedent_chunks( ) else: rows = await pool.fetch( - """SELECT id, chunk_index, content, section_type, page_number + """SELECT id, chunk_index, content, section_type, page_number, + halacha_extracted_at FROM precedent_chunks WHERE case_law_id = $1 ORDER BY chunk_index""", @@ -3280,6 +3291,89 @@ async def store_halachot(case_law_id: UUID, halachot: list[dict]) -> int: return len(halachot) +async def reset_halacha_extraction(case_law_id: UUID) -> None: + """Force a clean re-extraction: wipe halachot + clear per-chunk checkpoints + so every chunk is re-processed (used by explicit re-extract, not resume).""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + await conn.execute("DELETE FROM halachot WHERE case_law_id = $1", case_law_id) + await conn.execute( + "UPDATE precedent_chunks SET halacha_extracted_at = NULL " + "WHERE case_law_id = $1", case_law_id, + ) + + +async def mark_all_chunks_extracted(case_law_id: UUID) -> int: + """Checkpoint every un-marked chunk of a precedent as extracted. + + Used to backfill pre-V25 precedents (halachot already exist but no chunk was + checkpointed) so a resume run skips them instead of re-extracting (which + would duplicate). Returns rows updated. + """ + pool = await get_pool() + result = await pool.execute( + "UPDATE precedent_chunks SET halacha_extracted_at = now() " + "WHERE case_law_id = $1 AND halacha_extracted_at IS NULL", case_law_id, + ) + try: + return int(result.split()[-1]) + except (ValueError, IndexError): + return 0 + + +async def store_halachot_for_chunk( + case_law_id: UUID, chunk_id: UUID, halachot: list[dict], +) -> int: + """Persist ONE chunk's halachot and mark the chunk done — atomically. + + Crash-safe + resumable: each chunk's results land in the DB the moment it + finishes, and the chunk is flagged (``halacha_extracted_at``) so a resumed + run skips it. ``halacha_index`` continues from the current max so appends + across chunks never collide. The chunk is marked even when ``halachot`` is + empty (so resume skips genuinely-empty chunks too). Caller serializes calls + (a single in-process store-lock) so the MAX read stays race-free. + """ + threshold = config.HALACHA_AUTO_APPROVE_THRESHOLD + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + base = await conn.fetchval( + "SELECT COALESCE(MAX(halacha_index), -1) + 1 FROM halachot " + "WHERE case_law_id = $1", case_law_id, + ) + for j, h in enumerate(halachot): + confidence = float(h.get("confidence", 0.0)) + auto_approve = confidence >= threshold + review_status = "approved" if auto_approve else "pending_review" + reviewer = ( + f"auto-approved (confidence ≥ {threshold:.2f})" + if auto_approve else None + ) + reviewed_at_clause = "now()" if auto_approve else "NULL" + await conn.execute( + f"""INSERT INTO halachot + (case_law_id, halacha_index, rule_statement, rule_type, + reasoning_summary, supporting_quote, page_reference, + practice_areas, subject_tags, cites, confidence, + quote_verified, embedding, review_status, + reviewer, reviewed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, + $12, $13, $14, $15, {reviewed_at_clause})""", + case_law_id, base + j, h["rule_statement"], + h.get("rule_type", "binding"), h.get("reasoning_summary", ""), + h["supporting_quote"], h.get("page_reference", ""), + h.get("practice_areas", []), h.get("subject_tags", []), + h.get("cites", []), confidence, h.get("quote_verified", False), + h.get("embedding"), review_status, reviewer, + ) + await conn.execute( + "UPDATE precedent_chunks SET halacha_extracted_at = now() " + "WHERE id = $1", chunk_id, + ) + return len(halachot) + + async def list_halachot( case_law_id: UUID | None = None, review_status: str | None = None, diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index 8748066..c7c3797 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -342,9 +342,14 @@ async def _extract_chunk( return [], False -async def extract(case_law_id: UUID | str) -> dict: +async def extract(case_law_id: UUID | str, force: bool = False) -> dict: """Extract halachot from an uploaded precedent — globally serialized. + ``force=False`` (default) RESUMES: chunks already extracted (checkpointed) + are skipped, so a crash/interrupt never loses completed work or re-pays for + it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all + (used by explicit re-extraction). + Takes a PostgreSQL advisory lock so only ONE extraction runs at a time across ALL processes (agent retries + batch ``process_pending`` spawn independent OS drivers; an in-process Semaphore can't see them). If another @@ -374,7 +379,7 @@ async def extract(case_law_id: UUID | str) -> dict: "case_law_id": str(case_law_id), } try: - return await _extract_impl(case_law_id) + return await _extract_impl(case_law_id, force=force) finally: await lock_conn.fetchval( "SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY, @@ -383,11 +388,14 @@ async def extract(case_law_id: UUID | str) -> dict: await pool.release(lock_conn) -async def _extract_impl(case_law_id: UUID) -> dict: +async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict: """Core extraction (caller holds the global advisory lock for the duration). - Idempotent: replaces any existing halachot for this case_law_id. - All inserted rows start as ``review_status='pending_review'``. + Crash-safe + resumable: each chunk's halachot are stored AND the chunk is + checkpointed (``precedent_chunks.halacha_extracted_at``) the moment it + finishes. A crash/interrupt loses at most the in-flight chunk; a re-run + resumes — already-done chunks are skipped, failed/pending chunks retried. + ``force=True`` wipes prior halachot + checkpoints and re-extracts all. """ record = await db.get_case_law(case_law_id) if not record: @@ -415,111 +423,135 @@ async def _extract_impl(case_law_id: UUID) -> dict: await db.set_case_law_halacha_status(case_law_id, "completed") return {"status": "no_chunks", "extracted": 0, "stored": 0} - await db.set_case_law_halacha_status(case_law_id, "processing") - await db.delete_halachot(case_law_id) + # force = clean slate; otherwise resume (skip already-checkpointed chunks). + if force: + await db.reset_halacha_extraction(case_law_id) + for c in chunks: + c["halacha_extracted_at"] = None + await db.set_case_law_halacha_status(case_law_id, "processing") + + pending = [c for c in chunks if c.get("halacha_extracted_at") is None] + + # Legacy guard: a precedent extracted before V25 has halachot but NO chunk + # checkpoints. Re-extracting (append-per-chunk) would DUPLICATE them. If + # nothing is checkpointed yet but halachot already exist, backfill the + # checkpoints and treat as complete instead of re-extracting. + if not force and len(pending) == len(chunks): + already = await db.list_halachot(case_law_id=case_law_id, limit=1) + if already: + await db.mark_all_chunks_extracted(case_law_id) + total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) + await db.set_case_law_halacha_status(case_law_id, "completed") + logger.info( + "halacha_extractor: case_law=%s legacy-backfill — %d existing " + "halachot, checkpoints backfilled (no re-extract).", + case_law_id, total, + ) + return {"status": "completed", "extracted": total, "stored": total, + "legacy_backfill": True, "total_chunks": len(chunks)} + + if not pending: + # Resume found nothing left — every chunk already extracted. + total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "completed", "extracted": total, "stored": total, + "resumed": True, "total_chunks": len(chunks)} + + full_text = record.get("full_text") or "" citation = record.get("case_number", "") court = record.get("court", "") date_str = str(record.get("date") or "") context = f"מקור: {citation} — {court}, {date_str}" + idx_by_id = {c["id"]: i for i, c in enumerate(chunks)} sem = asyncio.Semaphore(CHUNK_CONCURRENCY) + store_lock = asyncio.Lock() # serialize per-chunk stores (index continuity) + stored_total = 0 + failed_chunks = 0 - async def _bounded(idx: int, chunk_row: dict) -> tuple[list[dict], bool]: + async def _process(chunk_row: dict) -> None: + nonlocal stored_total, failed_chunks async with sem: - return await _extract_chunk( + items, ok = await _extract_chunk( chunk_row["content"], chunk_row["section_type"], - idx, len(chunks), context, is_binding, + idx_by_id[chunk_row["id"]], len(chunks), context, is_binding, + ) + if not ok: + failed_chunks += 1 # leave chunk un-checkpointed → retried on resume + return + cleaned: list[dict] = [] + for raw in items: + coerced = _coerce_halacha(raw, is_binding=is_binding) + if coerced is None: + continue + coerced["quote_verified"] = _verify_quote( + coerced["supporting_quote"], full_text, + ) + cleaned.append(coerced) + if cleaned: + embed_inputs = [ + f"{h['rule_statement']} — {h['reasoning_summary']}".strip(" —") + for h in cleaned + ] + try: + vectors = await embeddings.embed_texts(embed_inputs, input_type="document") + except Exception as e: + logger.error("halacha_extractor: embeddings failed: %s", e) + vectors = [None] * len(cleaned) + for h, vec in zip(cleaned, vectors): + h["embedding"] = vec + # Store this chunk's halachot AND checkpoint the chunk, atomically. + async with store_lock: + stored_total += await db.store_halachot_for_chunk( + case_law_id, chunk_row["id"], cleaned, ) - chunk_results = await asyncio.gather( - *[_bounded(i, c) for i, c in enumerate(chunks)] - ) - raw_halachot: list[dict] = [] - failed_chunks = 0 - for items, ok in chunk_results: - raw_halachot.extend(items) - if not ok: - failed_chunks += 1 + await asyncio.gather(*[_process(c) for c in pending]) - # If most chunks failed (rate limit storm, claude_session crash, etc.) - # do NOT touch the DB status — leave it 'processing' so the caller can - # retry without the request falling out of the queue. The caller - # (`process_pending_extractions`) is responsible for either retrying or - # finalising the status as 'failed' after retries are exhausted. This - # is the bug that produced 317/10's silent `no_halachot` after a - # 129-chunk neighbour saturated the API. - failure_rate = failed_chunks / len(chunks) if chunks else 0 - if failure_rate >= EXTRACTION_FAILURE_THRESHOLD and not raw_halachot: - logger.error( - "halacha_extractor: case_law=%s extraction_failed — " - "%d/%d chunks failed (rate=%.0f%%), no halachot retrieved. " - "DB status left as 'processing' for caller-level retry.", - case_law_id, failed_chunks, len(chunks), failure_rate * 100, + # Decide final status from what's LEFT (re-read checkpoints). + after = await db.list_precedent_chunks(case_law_id, section_types=EXTRACTABLE_SECTIONS) + if not after: + after = await db.list_precedent_chunks(case_law_id) + still_pending = sum(1 for c in after if c.get("halacha_extracted_at") is None) + total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) + + if still_pending: + # Some chunks failed this run. Leave status 'processing' so a resume + # continues them (no progress is lost — done chunks are checkpointed). + if total == 0 and failed_chunks >= len(pending) * EXTRACTION_FAILURE_THRESHOLD: + logger.error( + "halacha_extractor: case_law=%s extraction_failed — %d/%d pending " + "chunks failed, 0 stored. status left 'processing' for retry.", + case_law_id, failed_chunks, len(pending), + ) + return {"status": "extraction_failed", "extracted": 0, "stored": 0, + "failed_chunks": failed_chunks, "pending_chunks": still_pending, + "total_chunks": len(chunks)} + logger.warning( + "halacha_extractor: case_law=%s partial — %d chunks still pending, " + "%d halachot stored so far. status 'processing' (resume to finish).", + case_law_id, still_pending, total, ) - return { - "status": "extraction_failed", - "extracted": 0, - "stored": 0, - "failed_chunks": failed_chunks, - "total_chunks": len(chunks), - } + return {"status": "partial", "extracted": total, "stored": stored_total, + "pending_chunks": still_pending, "total_chunks": len(chunks)} - if not raw_halachot: - await db.set_case_law_halacha_status(case_law_id, "completed") - return { - "status": "no_halachot", - "extracted": 0, - "stored": 0, - "failed_chunks": failed_chunks, - "total_chunks": len(chunks), - } - - # Validate against the full text of the precedent for the quote check. - full_text = record.get("full_text") or "" - - cleaned: list[dict] = [] - for raw in raw_halachot: - coerced = _coerce_halacha(raw, is_binding=is_binding) - if coerced is None: - continue - coerced["quote_verified"] = _verify_quote( - coerced["supporting_quote"], full_text, - ) - cleaned.append(coerced) - - if not cleaned: - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "no_valid_halachot", "extracted": len(raw_halachot), "stored": 0} - - # Embed rule_statement + reasoning_summary so semantic search hits the - # rule directly rather than the surrounding chunk centroid. - embed_inputs = [ - f"{h['rule_statement']} — {h['reasoning_summary']}".strip(" —") - for h in cleaned - ] - try: - vectors = await embeddings.embed_texts(embed_inputs, input_type="document") - except Exception as e: - logger.error("halacha_extractor: embeddings failed: %s", e) - vectors = [None] * len(cleaned) - - for halacha, vec in zip(cleaned, vectors): - halacha["embedding"] = vec - - stored = await db.store_halachot(case_law_id, cleaned) - - verified = sum(1 for h in cleaned if h["quote_verified"]) + # All chunks done. + stored = total + verified = sum(1 for h in await db.list_halachot(case_law_id=case_law_id, limit=10_000) + if h.get("quote_verified")) await db.set_case_law_halacha_status(case_law_id, "completed") logger.info( - "halacha_extractor: case_law=%s extracted=%d cleaned=%d verified=%d stored=%d", - case_law_id, len(raw_halachot), len(cleaned), verified, stored, + "halacha_extractor: case_law=%s completed — %d halachot stored " + "(%d new this run), %d quote-verified, %d chunks", + case_law_id, total, stored_total, verified, len(chunks), ) return { "status": "completed", - "extracted": len(raw_halachot), - "valid": len(cleaned), + "extracted": total, "verified": verified, "stored": stored, + "stored_this_run": stored_total, + "total_chunks": len(chunks), } diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 34618bb..eae3ea8 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -156,7 +156,10 @@ async def reextract_halachot( # bad data. See note in db.request_metadata_extraction. await progress("extracting_halachot", 50, "מחלץ הלכות מחדש") - result = await halacha_extractor.extract(case_law_id) + # Explicit re-extraction = clean slate (force): wipe prior halachot + + # per-chunk checkpoints and redo all. (Queue draining / resume uses the + # default force=False so an interrupted run continues where it stopped.) + result = await halacha_extractor.extract(case_law_id, force=True) # Clear the queue timestamp on completion so the UI badge / worker queue # don't keep showing this row. The queue worker (process_pending_extractions) # already does this; mirror it here so per-record extraction drains too.