"""Extract binding legal rules (הלכות) from external court rulings. Runs Claude (via the local headless ``claude -p`` bridge) over the legal_analysis / ruling / conclusion chunks of a precedent, returns a structured list of halachot, validates each one against the source text, embeds the rule statement, and stores everything as ``pending_review`` in the ``halachot`` table. All extraction is idempotent — calling ``extract(case_law_id)`` twice deletes prior rows for that precedent first. Trust model: Per chair decision, NO halacha is auto-published. Every extracted halacha enters with ``review_status='pending_review'``. The chair approves/rejects via the UI, and only ``approved`` (or ``published``) rows are visible to ``search_precedent_library`` and the writing agents. """ from __future__ import annotations import asyncio import logging import re from uuid import UUID from legal_mcp import config from legal_mcp.config import parse_llm_json from legal_mcp.services import ( claude_session, db, embeddings, halacha_quality, proofreader, ) logger = logging.getLogger(__name__) # Concurrency model mirrors claims_extractor — each ``claude -p`` subprocess # holds ~300 MB RSS, so we cap parallel chunks to keep the box healthy. # Env-tunable (HALACHA_CHUNK_CONCURRENCY) — see config.py. CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY # Global cross-process serialization key for halacha extraction. Every # extraction (whichever process/agent/driver launched it) takes a PostgreSQL # advisory lock on this key first; if another extraction already holds it the # call returns ``status='busy'`` and the request stays pending for the next # drain. This makes "one extraction at a time" hold across SEPARATE OS # processes (agent fallback retries spawn independent `python -c` drivers — an # in-process Semaphore cannot see them). Root cause of the 2026-05-31 freeze: # 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent # xhigh `claude -p` procs → load 69 → hard reboot. _HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA' CHUNK_RETRY_ATTEMPTS = 1 # If at least this fraction of chunks crash and the precedent yields zero # halachot, treat the run as `extraction_failed` rather than `no_halachot`. # Picked at 0.5 so a precedent that genuinely has no holdings (e.g. a remand # ruling that just sends the case back) isn't misflagged just because a few # chunks timed out, while a real rate-limit storm — which kills nearly every # call — is correctly distinguished and re-tried by the caller. EXTRACTION_FAILURE_THRESHOLD = 0.5 # Sections from which to extract. facts/intro/appellant_claims/respondent_claims # never contain holdings, only positions, so we skip them. EXTRACTABLE_SECTIONS = ("legal_analysis", "ruling", "conclusion") # Two prompts — choose by source's is_binding flag. # # The binding prompt extracts strict halachot (rules a future panel MUST # follow). It rejects obiter dicta, factual findings, and citations of # other rulings that the present court only mentioned in passing. # # The persuasive prompt is for sources that don't establish binding law # (most appeals committee decisions, district courts on planning matters, # etc.). For those, the value is in **how the panel reasoned and applied** # established law to facts — not in new halachot. The user explicitly # wants to be able to cite "another committee reached the same conclusion" # even though it is not binding. # # The schema's rule_type field accepts six values: # binding | interpretive | procedural | obiter | application | persuasive HALACHA_EXTRACTION_PROMPT_BINDING = """אתה משפטן בכיר המתמחה בדיני תכנון ובניה (ועדות ערר, היטל השבחה, פיצויים לפי סעיף 197 לחוק התכנון והבניה). תפקידך: לחלץ הלכות מחייבות מתוך פסק דין/החלטה משפטית של ערכאה עליונה (עליון / מנהלי). ## הגדרות מחייבות הלכה (binding rule) = כלל משפטי שהפסק קובע או מאמץ ומיישם, באופן שניתן להסתמך עליו בהחלטות עתידיות. לא-הלכה (אין לחלץ): - אמרת אגב (obiter dicta) — הערות שאינן הכרחיות להכרעה. - **סוגיה שהערכאה לא הכריעה בה** — אם בית המשפט אומר במפורש "אין צורך להכריע", "מבלי לקבוע מסמרות", "איני רואה לקבוע מסמרות", "למעלה מן הצורך", "אגב אורחא" — זו אינה הלכה. מבחן ההיפוך (Wambaugh): אם שלילת הכלל לא הייתה משנה את תוצאת הפסק — זו אמרת אגב, לא הלכה. - ממצאים עובדתיים ספציפיים לתיק או יישום על נסיבות התיק ("העורר לא הוכיח X", "במקרה דנן", שמות צדדים, סכומים/מספרים קונקרטיים) — חלץ את **העיקרון המופשט** בלבד, לא את יישומו על עובדות התיק. - ציטוטי הלכות מפסקי דין אחרים שלא אומצו במפורש בפסק זה. - הצהרות על דין קיים שאינן מיושמות בהכרעה, וכן ניסוח שהוא העתק של הציטוט ללא הפשטה. הבחנה קריטית: כאשר הפסק מצטט הלכה מפסק קודם, חלץ אותה רק אם בית המשפט בפסק הנוכחי **מאמץ ומחיל** אותה (לא רק מזכיר אותה ברקע). ## תחומים אפשריים (practice_areas) — תחומי ועדת הערר בלבד - rishuy_uvniya — רישוי ובניה (תיקי 1xxx: היתרים, שימוש חורג, תכניות, קווי בניין, גובה, חניה) - betterment_levy — היטל השבחה (תיקי 8xxx: שומה, מערכות, תכניות המקנות בה, מועד קובע, סופיות ההחלטה) - compensation_197 — פיצויים לפי ס' 197 (תיקי 9xxx: פגיעה במקרקעין, ירידת ערך, ס' 200/פטור) הלכה אחת יכולה לחול על כמה תחומים — practice_areas הוא array ולא string יחיד. ## סוגי הלכה (rule_type) - binding — הלכה מחייבת שהוחלה על התיק. - interpretive — פרשנות סעיף חוק/תכנית שאומצה. - procedural — כלל פרוצדורלי (סמכות, מועדים, הליכי שמיעה). - obiter — אמרת אגב חשובה (חלץ רק אם משמעותית; סמן confidence נמוך). ## פלט נדרש החזר JSON array בלבד, ללא markdown, ללא הסברים. דוגמה: [ { "rule_statement": "ניסוח הכלל בלשון משפטית מדויקת בגוף שלישי, 1-3 משפטים.", "rule_type": "binding", "reasoning_summary": "תמצית ההיגיון: למה בית המשפט הגיע לכלל הזה (1-2 משפטים).", "supporting_quote": "ציטוט מילולי מדויק מהפסק התומך בכלל. חייב להופיע מילה במילה בטקסט הקלט.", "page_reference": "פס' 12 / עמ' 8 — ככל שניתן לזהות מהקלט.", "practice_areas": ["betterment_levy"], "subject_tags": ["מועד_קביעת_שומה", "סופיות_ההחלטה"], "cites": ["עע\\"מ 3975/22"], "confidence": 0.85 } ] ## כללי איכות 1. **נאמנות מוחלטת לציטוט** — supporting_quote חייב להיות הדבקה מדויקת ו**שלמה** מהקלט (משפט שלם, לא חתוך באמצע). אם אין ציטוט מתאים — אל תמציא הלכה. 2. **מספר הלכות** — פסק רגיל מכיל 1-4 הלכות מחייבות. אל תמתח את הרשימה. אם אין הלכה — החזר []. 3. **לא לפצל יתר על המידה — קריטי** — כל הלכה = שאלה משפטית מובחנת אחת. אם כמה סעיפים מבטאים פנים שונים של אותה שאלה משפטית — אחד אותם לכלל אחד (בחר את הניסוח הכללי/המחייב ביותר). אל תחזיר את אותו עיקרון בכמה ניסוחים. 4. **שפה והפשטה** — rule_statement בעברית משפטית מקצועית בגוף שלישי, כעיקרון בר-הכללה לתיקים עתידיים — **לא** צמצום מילולי של הציטוט ולא קביעה התלויה בעובדות התיק. 5. **subject_tags** — 2-5 תגיות בעברית, snake_case (חניה, קווי_בניין, שיקול_דעת, פגם_פרוצדורלי, סמכות, מועדים, פגיעה_במקרקעין, ירידת_ערך). 6. **confidence** — 0..1. מתחת ל-0.7 = ספק לגבי היות זה הלכה מחייבת. """ HALACHA_EXTRACTION_PROMPT_PERSUASIVE = """אתה משפטן בכיר המתמחה בדיני תכנון ובניה. תפקידך: לחלץ עקרונות, יישומים ומסקנות מתוך החלטה של ועדת ערר אחרת או של בית משפט שאינו ערכאה עליונה לסוגיה. ## חשוב — מה לחלץ ומה לא המקור הזה **אינו** מקור להלכות מחייבות חדשות (binding rules). הלכות מחייבות מגיעות מהעליון/מנהלי. עם זאת, יש כאן ערך משמעותי שצריך לחלץ — איך הפנל הזה ניתח ויישם את הדין הקיים. כשנכתוב החלטה עתידית, נצטט מהמקור הזה כ"גם ועדת הערר ב-X הגיעה למסקנה דומה" — לא כסמכות מחייבת, אלא כתמיכה משכנעת. **יש לחלץ:** - **יישום של הלכה ידועה** (rule_type=`application`) — הפנל החיל הלכה ידועה (של עליון/מנהלי) על עובדות הנידונות. תצטט את ניסוח הכלל **כפי שהוצג כאן** (לא בהכרח כפי שנקבע במקור) ואת התוצאה. - **עקרון פרשני שאומץ** (rule_type=`interpretive`) — איך הפנל פירש סעיף חוק / תכנית, באופן שניתן לאמץ. - **כלל פרוצדורלי** (rule_type=`procedural`) — קביעות בנושאי סמכות, מועדים, הליך. - **מסקנה מנומקת ומשכנעת** (rule_type=`persuasive`) — מסקנה שלמה של הפנל בסוגיה, עם ההיגיון התומך, ניתנת לציטוט כאסמכתא משכנעת. **אין לחלץ:** - ממצאים עובדתיים ספציפיים לתיק או יישום על נסיבות התיק ("העורר לא הוכיח X", "במקרה דנן", שמות צדדים, סכומים קונקרטיים) — חלץ את העיקרון/היישום בניסוח בר-הכללה בלבד. - סוגיה שהפנל לא הכריע בה ("אין צורך להכריע", "מבלי לקבוע מסמרות", "למעלה מן הצורך"). - ציטוטים מפסקי דין אחרים ללא ניתוח של הפנל, וכן ניסוח שהוא העתק של הציטוט ללא הפשטה. - אמרות אגב חסרות חשיבות. ## תחומים אפשריים (practice_areas) — תחומי ועדת הערר בלבד - rishuy_uvniya — רישוי ובניה (תיקי 1xxx: היתרים, שימוש חורג, תכניות, קווי בניין, גובה, חניה) - betterment_levy — היטל השבחה (תיקי 8xxx: שומה, מערכות, תכניות המקנות בה, מועד קובע, סופיות ההחלטה) - compensation_197 — פיצויים לפי ס' 197 (תיקי 9xxx: פגיעה במקרקעין, ירידת ערך, ס' 200/פטור) ## פלט נדרש החזר JSON array בלבד, ללא markdown, ללא הסברים: [ { "rule_statement": "ניסוח הכלל / המסקנה / היישום בלשון משפטית מדויקת, 1-3 משפטים.", "rule_type": "application", "reasoning_summary": "תמצית ההיגיון של הפנל (1-2 משפטים).", "supporting_quote": "ציטוט מילולי מדויק מהקלט שתומך בכלל. חייב להופיע מילה במילה.", "page_reference": "פס' 12 / עמ' 8 — ככל שניתן לזהות.", "practice_areas": ["betterment_levy"], "subject_tags": ["מועד_קביעת_שומה", "תכנית_רחביה"], "cites": ["עע\\"מ 3975/22"], "confidence": 0.85 } ] ## כללי איכות 1. **נאמנות מוחלטת לציטוט** — supporting_quote חייב להיות הדבקה מדויקת מהקלט. אם אין ציטוט מתאים — אל תוסיף את ההלכה. 2. **מספר הלכות** — החלטה ארוכה של ועדת ערר יכולה להניב 2-8 פריטים (יישומים + מסקנות). אל תמתח את הרשימה. אם אין מה לחלץ — החזר []. 3. **rule_type מדויק** — application = יישום הלכה ידועה. interpretive = פרשנות. procedural = פרוצדורה. persuasive = מסקנה כללית בעלת ערך כאסמכתא. 4. **לא לפצל יתר על המידה — קריטי** — כל פריט = שאלה משפטית מובחנת אחת. פנים שונים של אותה שאלה = פריט אחד (בחר את הניסוח הכללי ביותר). אל תחזיר את אותו עיקרון בכמה ניסוחים. 5. **שפה** — עברית משפטית מקצועית, גוף שלישי. 6. **subject_tags** — 2-5 תגיות בעברית, snake_case. 7. **confidence** — 0..1. דייק. """ _VALID_PRACTICE_AREAS = {"rishuy_uvniya", "betterment_levy", "compensation_197"} _VALID_RULE_TYPES = { "binding", "interpretive", "procedural", "obiter", "application", "persuasive", } def _normalize_for_comparison(text: str) -> str: """Normalize Hebrew text for substring matching. Collapses whitespace and unifies the half-dozen Hebrew quote-mark variants. Use ``proofreader._fix_hebrew_quotes`` for the quote part so we stay consistent with the proofreader pipeline. """ fixed = proofreader._fix_hebrew_quotes(text) # Collapse all whitespace (newlines, tabs, multiple spaces) to a single space. return re.sub(r"\s+", " ", fixed).strip() def _verify_quote(supporting_quote: str, full_text: str) -> bool: """Return True if ``supporting_quote`` appears verbatim in ``full_text`` after Hebrew quote/whitespace normalization. The LLM occasionally trims a leading/trailing word from the quote; we accept the quote if at least 90% of its characters match a contiguous substring of the source. """ if not supporting_quote.strip(): return False normalized_quote = _normalize_for_comparison(supporting_quote) normalized_text = _normalize_for_comparison(full_text) if not normalized_quote: return False if normalized_quote in normalized_text: return True # Fallback: try the inner 90% of the quote (drops boundary trim). if len(normalized_quote) >= 30: trim = max(2, len(normalized_quote) // 20) inner = normalized_quote[trim:-trim] if inner and inner in normalized_text: return True return False def _coerce_halacha(raw: dict, is_binding: bool = True) -> dict | None: """Validate and normalize one LLM-returned halacha dict. Returns ``None`` if the entry is missing required fields. ``is_binding`` only affects the default rule_type when the LLM returned an unknown value — for binding sources we default to ``binding``, otherwise to ``persuasive`` (never pretend an appeals committee created halacha). """ if not isinstance(raw, dict): return None rule_statement = (raw.get("rule_statement") or "").strip() supporting_quote = (raw.get("supporting_quote") or "").strip() if not rule_statement or not supporting_quote: return None default_rule_type = "binding" if is_binding else "persuasive" rule_type = (raw.get("rule_type") or default_rule_type).strip().lower() if rule_type not in _VALID_RULE_TYPES: rule_type = default_rule_type # Guard: don't let a non-binding source produce 'binding' rule_type if not is_binding and rule_type == "binding": rule_type = "persuasive" practice_areas_raw = raw.get("practice_areas") or [] if isinstance(practice_areas_raw, str): practice_areas_raw = [practice_areas_raw] practice_areas = [p for p in practice_areas_raw if p in _VALID_PRACTICE_AREAS] subject_tags_raw = raw.get("subject_tags") or [] if isinstance(subject_tags_raw, str): subject_tags_raw = [subject_tags_raw] subject_tags = [str(t).strip() for t in subject_tags_raw if str(t).strip()] cites_raw = raw.get("cites") or [] if isinstance(cites_raw, str): cites_raw = [cites_raw] cites = [str(c).strip() for c in cites_raw if str(c).strip()] try: confidence = float(raw.get("confidence", 0.0)) except (TypeError, ValueError): confidence = 0.0 confidence = max(0.0, min(1.0, confidence)) return { "rule_statement": rule_statement, "rule_type": rule_type, "reasoning_summary": (raw.get("reasoning_summary") or "").strip(), "supporting_quote": supporting_quote, "page_reference": (raw.get("page_reference") or "").strip(), "practice_areas": practice_areas, "subject_tags": subject_tags, "cites": cites, "confidence": confidence, } async def _nli_check(items: list[dict]) -> list[str]: """Entailment verdict per item (rule ⊨ quote) via claude_session — #81.3. Local CLI, zero cost. FAILS OPEN: any error returns all-'entailed' so a flaky/unavailable judge (e.g. in the container) never blocks a halacha. """ if not items: return [] try: raw = await claude_session.query_json( halacha_quality.build_nli_prompt(items), system=halacha_quality.NLI_SYSTEM, model=config.HALACHA_NLI_MODEL or None, effort=config.HALACHA_NLI_EFFORT or None, ) except Exception as e: logger.warning("halacha NLI check failed (fail-open, no flags): %s", e) return ["entailed"] * len(items) return halacha_quality.parse_nli_verdicts(raw, len(items)) def _consolidation_priority(r: dict): """Canonical = the row to KEEP within a fold group (lower sorts first).""" status_rank = {"approved": 0, "published": 0, "pending_review": 1}.get( r.get("review_status"), 2) return ( status_rank, -float(r.get("confidence") or 0.0), 0 if r.get("quote_verified") else 1, -len(r.get("rule_statement") or ""), str(r["id"]), ) async def _consolidate_precedent(case_law_id: UUID) -> int: """#81.5 — fold facets of the SAME legal question into one canonical. Per-precedent claude_session pass (local CLI, zero cost). Keeps the best row of each fold group; marks the rest ``rejected`` (reversible — out of the active corpus AND the review queue, but recoverable). FOLD-ONLY. Fails OPEN: any error / parse failure → 0 folds (never touches data on doubt). """ if not config.HALACHA_CONSOLIDATE_ENABLED: return 0 try: rows = [ r for r in await db.list_halachot(case_law_id=case_law_id, limit=10_000) if r.get("review_status") != "rejected" ] if len(rows) < 2: return 0 by_idx = {r["halacha_index"]: r for r in rows} raw = await claude_session.query_json( halacha_quality.build_consolidation_prompt(rows), system=halacha_quality.CONSOLIDATE_SYSTEM, model=config.HALACHA_CONSOLIDATE_MODEL or None, effort=config.HALACHA_CONSOLIDATE_EFFORT or None, ) groups = halacha_quality.parse_fold_groups(raw) if not groups: return 0 canonicals: set[str] = set() losers: set[str] = set() for g in groups: members = [by_idx[i] for i in g if i in by_idx] if len(members) < 2: continue members.sort(key=_consolidation_priority) canonicals.add(str(members[0]["id"])) for m in members[1:]: losers.add(str(m["id"])) # Never reject a row that is the canonical of any group. loser_ids = [i for i in losers if i not in canonicals] if not loser_ids: return 0 return await db.update_halachot_batch( loser_ids, "rejected", reviewer="auto-consolidated (#81.5 facet-fold)", ) except Exception as e: logger.warning( "halacha consolidation failed for %s (fail-open, no folds): %s", case_law_id, e, ) return 0 async def _extract_chunk( chunk_text: str, section_type: str, chunk_index: int, chunk_total: int, context: str, is_binding: bool, effort: str | None = None, ) -> tuple[list[dict], bool]: """Run the halacha extractor on one chunk with retry. Returns ``(halachot, succeeded)`` so the caller can distinguish "Claude said there are no halachot here" (`(_, True)`) from "every attempt crashed/timed out" (`(_, False)`). Without this distinction a precedent that hit a rate-limit storm looks identical to one that genuinely has no halachot — and gets silently marked `no_halachot`. The prompt branches on ``is_binding`` so non-binding sources (other appeals committees, district courts) yield application/persuasive entries rather than a forced 0-result strict halacha pass. """ base_prompt = ( HALACHA_EXTRACTION_PROMPT_BINDING if is_binding else HALACHA_EXTRACTION_PROMPT_PERSUASIVE ) chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else "" # Pass the static instruction prompt as `system` so the SDK path can cache # it (5-min ephemeral). Only the per-chunk content varies via `prompt`. user_msg = ( f"## הקלט\n" f"סוג קטע: {section_type}\n" f"{context}{chunk_label}\n\n" f"--- תחילת הטקסט ---\n{chunk_text}\n--- סוף הטקסט ---" ) last_err: Exception | None = None for attempt in range(CHUNK_RETRY_ATTEMPTS + 1): try: result = await claude_session.query_json( user_msg, system=base_prompt, model=config.HALACHA_EXTRACT_MODEL or None, effort=(effort or config.HALACHA_EXTRACT_EFFORT) or None, ) except Exception as e: last_err = e logger.warning( "halacha_extractor chunk %d/%d attempt %d raised: %s", chunk_index + 1, chunk_total, attempt + 1, e, ) continue if isinstance(result, list): return result, True logger.warning( "halacha_extractor chunk %d/%d attempt %d returned non-list (%s)", chunk_index + 1, chunk_total, attempt + 1, type(result).__name__, ) logger.error( "halacha_extractor chunk %d/%d failed after %d attempts: %s", chunk_index + 1, chunk_total, CHUNK_RETRY_ATTEMPTS + 1, last_err, ) return [], False async def extract(case_law_id: UUID | str, force: bool = False, effort: str | None = None) -> dict: """Extract halachot from an uploaded precedent — globally serialized. ``effort`` overrides the per-chunk LLM effort (default ``config.HALACHA_EXTRACT_EFFORT`` = xhigh). Bulk queue-drains pass the lighter ``config.HALACHA_BULK_EXTRACT_EFFORT`` to cut wall-clock at scale. ``force=False`` (default) RESUMES: chunks already extracted (checkpointed) are skipped, so a crash/interrupt never loses completed work or re-pays for it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all (used by explicit re-extraction). Takes a PostgreSQL advisory lock so only ONE extraction runs at a time across ALL processes (agent retries + batch ``process_pending`` spawn independent OS drivers; an in-process Semaphore can't see them). If another extraction already holds the lock this returns ``status='busy'`` and the precedent stays pending for the next drain — no second xhigh run piles on (this is the fix for the 2026-05-31 box freeze). Returns: ``{"status": "...", "extracted": N, "verified": M, "stored": K, ...}`` """ if isinstance(case_law_id, str): case_law_id = UUID(case_law_id) pool = await db.get_pool() lock_conn = await pool.acquire() try: got = await lock_conn.fetchval( "SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY, ) if not got: logger.warning( "halacha extract: global lock held by another extraction — " "skipping %s (stays pending for next drain)", case_law_id, ) return { "status": "busy", "extracted": 0, "stored": 0, "case_law_id": str(case_law_id), } try: return await _extract_impl(case_law_id, force=force, effort=effort) finally: await lock_conn.fetchval( "SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY, ) finally: await pool.release(lock_conn) async def _extract_impl(case_law_id: UUID, force: bool = False, effort: str | None = None) -> dict: """Core extraction (caller holds the global advisory lock for the duration). Crash-safe + resumable: each chunk's halachot are stored AND the chunk is checkpointed (``precedent_chunks.halacha_extracted_at``) the moment it finishes. A crash/interrupt loses at most the in-flight chunk; a re-run resumes — already-done chunks are skipped, failed/pending chunks retried. ``force=True`` wipes prior halachot + checkpoints and re-extracts all. """ record = await db.get_case_law(case_law_id) if not record: return {"status": "not_found", "extracted": 0, "stored": 0} is_binding = bool(record.get("is_binding")) # Try the targeted sections first (legal_analysis / ruling / conclusion). # If the chunker labeled everything as 'other' (common when a ruling # uses non-standard headings or the section markers aren't bracketed # cleanly), fall back to ALL chunks — better to over-include than to # silently skip a ruling that has reasoning under an unexpected label. chunks = await db.list_precedent_chunks( case_law_id, section_types=EXTRACTABLE_SECTIONS, ) if not chunks: chunks = await db.list_precedent_chunks(case_law_id) if chunks: logger.info( "halacha_extractor: case_law=%s — no targeted sections, " "falling back to all %d chunks", case_law_id, len(chunks), ) if not chunks: await db.set_case_law_halacha_status(case_law_id, "completed") return {"status": "no_chunks", "extracted": 0, "stored": 0} # force = clean slate; otherwise resume (skip already-checkpointed chunks). if force: await db.reset_halacha_extraction(case_law_id) for c in chunks: c["halacha_extracted_at"] = None await db.set_case_law_halacha_status(case_law_id, "processing") pending = [c for c in chunks if c.get("halacha_extracted_at") is None] # Legacy guard: a precedent extracted before V25 has halachot but NO chunk # checkpoints. Re-extracting (append-per-chunk) would DUPLICATE them. If # nothing is checkpointed yet but halachot already exist, backfill the # checkpoints and treat as complete instead of re-extracting. if not force and len(pending) == len(chunks): already = await db.list_halachot(case_law_id=case_law_id, limit=1) if already: await db.mark_all_chunks_extracted(case_law_id) total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) await db.set_case_law_halacha_status(case_law_id, "completed") logger.info( "halacha_extractor: case_law=%s legacy-backfill — %d existing " "halachot, checkpoints backfilled (no re-extract).", case_law_id, total, ) return {"status": "completed", "extracted": total, "stored": total, "legacy_backfill": True, "total_chunks": len(chunks)} if not pending: # Resume found nothing left — every chunk already extracted. total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) await db.set_case_law_halacha_status(case_law_id, "completed") return {"status": "completed", "extracted": total, "stored": total, "resumed": True, "total_chunks": len(chunks)} full_text = record.get("full_text") or "" citation = record.get("case_number", "") court = record.get("court", "") date_str = str(record.get("date") or "") context = f"מקור: {citation} — {court}, {date_str}" idx_by_id = {c["id"]: i for i, c in enumerate(chunks)} sem = asyncio.Semaphore(CHUNK_CONCURRENCY) store_lock = asyncio.Lock() # serialize per-chunk stores (index continuity) stored_total = 0 failed_chunks = 0 async def _process(chunk_row: dict) -> None: nonlocal stored_total, failed_chunks async with sem: items, ok = await _extract_chunk( chunk_row["content"], chunk_row["section_type"], idx_by_id[chunk_row["id"]], len(chunks), context, is_binding, effort, ) if not ok: failed_chunks += 1 # leave chunk un-checkpointed → retried on resume return cleaned: list[dict] = [] for raw in items: coerced = _coerce_halacha(raw, is_binding=is_binding) if coerced is None: continue coerced["quote_verified"] = _verify_quote( coerced["supporting_quote"], full_text, ) # Strict-rubric quality gate (docs/halacha-strict-rubric.md): # flags block auto-approval (route to pending_review); a court # non-decision is re-typed obiter so it never reads as a holding. flags = halacha_quality.compute_quality_flags( coerced["rule_statement"], coerced["supporting_quote"], coerced["reasoning_summary"], coerced["quote_verified"], ) coerced["quality_flags"] = flags if halacha_quality.FLAG_NON_DECISION in flags and coerced["rule_type"] != "obiter": coerced["rule_type"] = "obiter" cleaned.append(coerced) # #81.3 NLI entailment — one batched judge call per chunk (fail-open). if config.HALACHA_NLI_ENABLED and cleaned: verdicts = await _nli_check(cleaned) for h, v in zip(cleaned, verdicts): if v != "entailed" and halacha_quality.FLAG_NLI_UNSUPPORTED not in h["quality_flags"]: h["quality_flags"].append(halacha_quality.FLAG_NLI_UNSUPPORTED) if cleaned: embed_inputs = [ f"{h['rule_statement']} — {h['reasoning_summary']}".strip(" —") for h in cleaned ] try: vectors = await embeddings.embed_texts(embed_inputs, input_type="document") except Exception as e: logger.error("halacha_extractor: embeddings failed: %s", e) vectors = [None] * len(cleaned) for h, vec in zip(cleaned, vectors): h["embedding"] = vec # Store this chunk's halachot AND checkpoint the chunk, atomically. async with store_lock: stored_total += await db.store_halachot_for_chunk( case_law_id, chunk_row["id"], cleaned, ) await asyncio.gather(*[_process(c) for c in pending]) # Decide final status from what's LEFT (re-read checkpoints). after = await db.list_precedent_chunks(case_law_id, section_types=EXTRACTABLE_SECTIONS) if not after: after = await db.list_precedent_chunks(case_law_id) still_pending = sum(1 for c in after if c.get("halacha_extracted_at") is None) total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) if still_pending: # Some chunks failed this run. Leave status 'processing' so a resume # continues them (no progress is lost — done chunks are checkpointed). if total == 0 and failed_chunks >= len(pending) * EXTRACTION_FAILURE_THRESHOLD: logger.error( "halacha_extractor: case_law=%s extraction_failed — %d/%d pending " "chunks failed, 0 stored. status left 'processing' for retry.", case_law_id, failed_chunks, len(pending), ) return {"status": "extraction_failed", "extracted": 0, "stored": 0, "failed_chunks": failed_chunks, "pending_chunks": still_pending, "total_chunks": len(chunks)} logger.warning( "halacha_extractor: case_law=%s partial — %d chunks still pending, " "%d halachot stored so far. status 'processing' (resume to finish).", case_law_id, still_pending, total, ) return {"status": "partial", "extracted": total, "stored": stored_total, "pending_chunks": still_pending, "total_chunks": len(chunks)} # All chunks done. #81.5: fold cross-chunk facets of one legal question # (the prompt dedups within a chunk; this catches across chunks). folded = await _consolidate_precedent(case_law_id) stored = total verified = sum(1 for h in await db.list_halachot(case_law_id=case_law_id, limit=10_000) if h.get("quote_verified")) await db.set_case_law_halacha_status(case_law_id, "completed") logger.info( "halacha_extractor: case_law=%s completed — %d halachot stored " "(%d new this run), %d quote-verified, %d folded, %d chunks", case_law_id, total, stored_total, verified, folded, len(chunks), ) return { "status": "completed", "extracted": total, "verified": verified, "folded": folded, "stored": stored, "stored_this_run": stored_total, "total_chunks": len(chunks), }