Files
legal-ai/mcp-server/src/legal_mcp/services/halacha_extractor.py
Chaim 885cba543e feat(halacha): lighter effort for BULK queue-drain extraction (speed at scale)
xhigh is the quality sweet-spot for a single precedent but very slow at scale
(64-chunk case ≈ 20 min). Bulk queue-drains (process_pending over many
precedents) now use a lighter effort to cut wall-clock; interactive single
re-extraction keeps xhigh quality.

- config.HALACHA_BULK_EXTRACT_EFFORT (env, default 'high'; set 'medium' for max
  speed, 'xhigh' to match single).
- extract()/_extract_impl()/_extract_chunk() take an `effort` override threaded
  to claude_session.query_json; None falls back to HALACHA_EXTRACT_EFFORT (xhigh).
- process_pending_extractions(kind='halacha') passes the bulk effort; single
  reextract_halachot keeps xhigh.

Verified end-to-end (mocked LLM): _extract_chunk(effort='medium') → query_json
effort='medium'; effort=None → 'xhigh' fallback. Closes the open item in #72.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 21:34:13 +00:00

566 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Extract binding legal rules (הלכות) from external court rulings.
Runs Claude (via the local headless ``claude -p`` bridge) over the
legal_analysis / ruling / conclusion chunks of a precedent, returns a
structured list of halachot, validates each one against the source text,
embeds the rule statement, and stores everything as ``pending_review`` in
the ``halachot`` table.
All extraction is idempotent — calling ``extract(case_law_id)`` twice
deletes prior rows for that precedent first.
Trust model:
Per chair decision, NO halacha is auto-published. Every extracted
halacha enters with ``review_status='pending_review'``. The chair
approves/rejects via the UI, and only ``approved`` (or ``published``)
rows are visible to ``search_precedent_library`` and the writing
agents.
"""
from __future__ import annotations
import asyncio
import logging
import re
from uuid import UUID
from legal_mcp import config
from legal_mcp.config import parse_llm_json
from legal_mcp.services import claude_session, db, embeddings, proofreader
logger = logging.getLogger(__name__)
# Concurrency model mirrors claims_extractor — each ``claude -p`` subprocess
# holds ~300 MB RSS, so we cap parallel chunks to keep the box healthy.
# Env-tunable (HALACHA_CHUNK_CONCURRENCY) — see config.py.
CHUNK_CONCURRENCY = config.HALACHA_CHUNK_CONCURRENCY
# Global cross-process serialization key for halacha extraction. Every
# extraction (whichever process/agent/driver launched it) takes a PostgreSQL
# advisory lock on this key first; if another extraction already holds it the
# call returns ``status='busy'`` and the request stays pending for the next
# drain. This makes "one extraction at a time" hold across SEPARATE OS
# processes (agent fallback retries spawn independent `python -c` drivers — an
# in-process Semaphore cannot see them). Root cause of the 2026-05-31 freeze:
# 4-5 overlapping driver processes × CHUNK_CONCURRENCY each → 12-16 concurrent
# xhigh `claude -p` procs → load 69 → hard reboot.
_HALACHA_EXTRACT_LOCK_KEY = 0x48414C41 # 'HALA'
CHUNK_RETRY_ATTEMPTS = 1
# If at least this fraction of chunks crash and the precedent yields zero
# halachot, treat the run as `extraction_failed` rather than `no_halachot`.
# Picked at 0.5 so a precedent that genuinely has no holdings (e.g. a remand
# ruling that just sends the case back) isn't misflagged just because a few
# chunks timed out, while a real rate-limit storm — which kills nearly every
# call — is correctly distinguished and re-tried by the caller.
EXTRACTION_FAILURE_THRESHOLD = 0.5
# Sections from which to extract. facts/intro/appellant_claims/respondent_claims
# never contain holdings, only positions, so we skip them.
EXTRACTABLE_SECTIONS = ("legal_analysis", "ruling", "conclusion")
# Two prompts — choose by source's is_binding flag.
#
# The binding prompt extracts strict halachot (rules a future panel MUST
# follow). It rejects obiter dicta, factual findings, and citations of
# other rulings that the present court only mentioned in passing.
#
# The persuasive prompt is for sources that don't establish binding law
# (most appeals committee decisions, district courts on planning matters,
# etc.). For those, the value is in **how the panel reasoned and applied**
# established law to facts — not in new halachot. The user explicitly
# wants to be able to cite "another committee reached the same conclusion"
# even though it is not binding.
#
# The schema's rule_type field accepts six values:
# binding | interpretive | procedural | obiter | application | persuasive
HALACHA_EXTRACTION_PROMPT_BINDING = """אתה משפטן בכיר המתמחה בדיני תכנון ובניה (ועדות ערר, היטל השבחה, פיצויים לפי סעיף 197 לחוק התכנון והבניה). תפקידך: לחלץ הלכות מחייבות מתוך פסק דין/החלטה משפטית של ערכאה עליונה (עליון / מנהלי).
## הגדרות מחייבות
הלכה (binding rule) = כלל משפטי שהפסק קובע או מאמץ ומיישם, באופן שניתן להסתמך עליו בהחלטות עתידיות.
לא-הלכה (אין לחלץ):
- אמרת אגב (obiter dicta) — הערות שאינן הכרחיות להכרעה.
- ממצאים עובדתיים ספציפיים לתיק ("העורר לא הוכיח X").
- ציטוטי הלכות מפסקי דין אחרים שלא אומצו במפורש בפסק זה.
- הצהרות על דין קיים שאינן מיושמות בהכרעה.
הבחנה קריטית: כאשר הפסק מצטט הלכה מפסק קודם, חלץ אותה רק אם בית המשפט בפסק הנוכחי **מאמץ ומחיל** אותה (לא רק מזכיר אותה ברקע).
## תחומים אפשריים (practice_areas) — תחומי ועדת הערר בלבד
- rishuy_uvniya — רישוי ובניה (תיקי 1xxx: היתרים, שימוש חורג, תכניות, קווי בניין, גובה, חניה)
- betterment_levy — היטל השבחה (תיקי 8xxx: שומה, מערכות, תכניות המקנות בה, מועד קובע, סופיות ההחלטה)
- compensation_197 — פיצויים לפי ס' 197 (תיקי 9xxx: פגיעה במקרקעין, ירידת ערך, ס' 200/פטור)
הלכה אחת יכולה לחול על כמה תחומים — practice_areas הוא array ולא string יחיד.
## סוגי הלכה (rule_type)
- binding — הלכה מחייבת שהוחלה על התיק.
- interpretive — פרשנות סעיף חוק/תכנית שאומצה.
- procedural — כלל פרוצדורלי (סמכות, מועדים, הליכי שמיעה).
- obiter — אמרת אגב חשובה (חלץ רק אם משמעותית; סמן confidence נמוך).
## פלט נדרש
החזר JSON array בלבד, ללא markdown, ללא הסברים. דוגמה:
[
{
"rule_statement": "ניסוח הכלל בלשון משפטית מדויקת בגוף שלישי, 1-3 משפטים.",
"rule_type": "binding",
"reasoning_summary": "תמצית ההיגיון: למה בית המשפט הגיע לכלל הזה (1-2 משפטים).",
"supporting_quote": "ציטוט מילולי מדויק מהפסק התומך בכלל. חייב להופיע מילה במילה בטקסט הקלט.",
"page_reference": "פס' 12 / עמ' 8 — ככל שניתן לזהות מהקלט.",
"practice_areas": ["betterment_levy"],
"subject_tags": ["מועד_קביעת_שומה", "סופיות_ההחלטה"],
"cites": ["עע\\"מ 3975/22"],
"confidence": 0.85
}
]
## כללי איכות
1. **נאמנות מוחלטת לציטוט** — supporting_quote חייב להיות הדבקה מדויקת מהקלט. אם אין ציטוט מתאים — אל תמציא הלכה.
2. **מספר הלכות** — פסק רגיל מכיל 1-4 הלכות מחייבות. אל תמתח את הרשימה. אם אין הלכה — החזר [].
3. **לא לפצל יתר על המידה** — אם שני סעיפים מבטאים את אותו עיקרון, אחד את הניסוח.
4. **שפה** — rule_statement בעברית משפטית מקצועית, לא צמצום מילולי של הציטוט.
5. **subject_tags** — 2-5 תגיות בעברית, snake_case (חניה, קווי_בניין, שיקול_דעת, פגם_פרוצדורלי, סמכות, מועדים, פגיעה_במקרקעין, ירידת_ערך).
6. **confidence** — 0..1. מתחת ל-0.7 = ספק לגבי היות זה הלכה מחייבת.
"""
HALACHA_EXTRACTION_PROMPT_PERSUASIVE = """אתה משפטן בכיר המתמחה בדיני תכנון ובניה. תפקידך: לחלץ עקרונות, יישומים ומסקנות מתוך החלטה של ועדת ערר אחרת או של בית משפט שאינו ערכאה עליונה לסוגיה.
## חשוב — מה לחלץ ומה לא
המקור הזה **אינו** מקור להלכות מחייבות חדשות (binding rules). הלכות מחייבות מגיעות מהעליון/מנהלי. עם זאת, יש כאן ערך משמעותי שצריך לחלץ — איך הפנל הזה ניתח ויישם את הדין הקיים. כשנכתוב החלטה עתידית, נצטט מהמקור הזה כ"גם ועדת הערר ב-X הגיעה למסקנה דומה" — לא כסמכות מחייבת, אלא כתמיכה משכנעת.
**יש לחלץ:**
- **יישום של הלכה ידועה** (rule_type=`application`) — הפנל החיל הלכה ידועה (של עליון/מנהלי) על עובדות הנידונות. תצטט את ניסוח הכלל **כפי שהוצג כאן** (לא בהכרח כפי שנקבע במקור) ואת התוצאה.
- **עקרון פרשני שאומץ** (rule_type=`interpretive`) — איך הפנל פירש סעיף חוק / תכנית, באופן שניתן לאמץ.
- **כלל פרוצדורלי** (rule_type=`procedural`) — קביעות בנושאי סמכות, מועדים, הליך.
- **מסקנה מנומקת ומשכנעת** (rule_type=`persuasive`) — מסקנה שלמה של הפנל בסוגיה, עם ההיגיון התומך, ניתנת לציטוט כאסמכתא משכנעת.
**אין לחלץ:**
- ממצאים עובדתיים ספציפיים לתיק ("העורר לא הוכיח X").
- ציטוטים מפסקי דין אחרים ללא ניתוח של הפנל.
- אמרות אגב חסרות חשיבות.
## תחומים אפשריים (practice_areas) — תחומי ועדת הערר בלבד
- rishuy_uvniya — רישוי ובניה (תיקי 1xxx: היתרים, שימוש חורג, תכניות, קווי בניין, גובה, חניה)
- betterment_levy — היטל השבחה (תיקי 8xxx: שומה, מערכות, תכניות המקנות בה, מועד קובע, סופיות ההחלטה)
- compensation_197 — פיצויים לפי ס' 197 (תיקי 9xxx: פגיעה במקרקעין, ירידת ערך, ס' 200/פטור)
## פלט נדרש
החזר JSON array בלבד, ללא markdown, ללא הסברים:
[
{
"rule_statement": "ניסוח הכלל / המסקנה / היישום בלשון משפטית מדויקת, 1-3 משפטים.",
"rule_type": "application",
"reasoning_summary": "תמצית ההיגיון של הפנל (1-2 משפטים).",
"supporting_quote": "ציטוט מילולי מדויק מהקלט שתומך בכלל. חייב להופיע מילה במילה.",
"page_reference": "פס' 12 / עמ' 8 — ככל שניתן לזהות.",
"practice_areas": ["betterment_levy"],
"subject_tags": ["מועד_קביעת_שומה", "תכנית_רחביה"],
"cites": ["עע\\"מ 3975/22"],
"confidence": 0.85
}
]
## כללי איכות
1. **נאמנות מוחלטת לציטוט** — supporting_quote חייב להיות הדבקה מדויקת מהקלט. אם אין ציטוט מתאים — אל תוסיף את ההלכה.
2. **מספר הלכות** — החלטה ארוכה של ועדת ערר יכולה להניב 2-8 פריטים (יישומים + מסקנות). אם אין מה לחלץ — החזר [].
3. **rule_type מדויק** — application = יישום הלכה ידועה. interpretive = פרשנות. procedural = פרוצדורה. persuasive = מסקנה כללית בעלת ערך כאסמכתא.
4. **לא לפצל יתר על המידה** — שני סעיפים זהים מבחינה רעיונית = פריט אחד.
5. **שפה** — עברית משפטית מקצועית, גוף שלישי.
6. **subject_tags** — 2-5 תגיות בעברית, snake_case.
7. **confidence** — 0..1. דייק.
"""
_VALID_PRACTICE_AREAS = {"rishuy_uvniya", "betterment_levy", "compensation_197"}
_VALID_RULE_TYPES = {
"binding", "interpretive", "procedural", "obiter",
"application", "persuasive",
}
def _normalize_for_comparison(text: str) -> str:
"""Normalize Hebrew text for substring matching.
Collapses whitespace and unifies the half-dozen Hebrew quote-mark
variants. Use ``proofreader._fix_hebrew_quotes`` for the quote part
so we stay consistent with the proofreader pipeline.
"""
fixed = proofreader._fix_hebrew_quotes(text)
# Collapse all whitespace (newlines, tabs, multiple spaces) to a single space.
return re.sub(r"\s+", " ", fixed).strip()
def _verify_quote(supporting_quote: str, full_text: str) -> bool:
"""Return True if ``supporting_quote`` appears verbatim in ``full_text``
after Hebrew quote/whitespace normalization.
The LLM occasionally trims a leading/trailing word from the quote;
we accept the quote if at least 90% of its characters match a
contiguous substring of the source.
"""
if not supporting_quote.strip():
return False
normalized_quote = _normalize_for_comparison(supporting_quote)
normalized_text = _normalize_for_comparison(full_text)
if not normalized_quote:
return False
if normalized_quote in normalized_text:
return True
# Fallback: try the inner 90% of the quote (drops boundary trim).
if len(normalized_quote) >= 30:
trim = max(2, len(normalized_quote) // 20)
inner = normalized_quote[trim:-trim]
if inner and inner in normalized_text:
return True
return False
def _coerce_halacha(raw: dict, is_binding: bool = True) -> dict | None:
"""Validate and normalize one LLM-returned halacha dict.
Returns ``None`` if the entry is missing required fields. ``is_binding``
only affects the default rule_type when the LLM returned an unknown
value — for binding sources we default to ``binding``, otherwise to
``persuasive`` (never pretend an appeals committee created halacha).
"""
if not isinstance(raw, dict):
return None
rule_statement = (raw.get("rule_statement") or "").strip()
supporting_quote = (raw.get("supporting_quote") or "").strip()
if not rule_statement or not supporting_quote:
return None
default_rule_type = "binding" if is_binding else "persuasive"
rule_type = (raw.get("rule_type") or default_rule_type).strip().lower()
if rule_type not in _VALID_RULE_TYPES:
rule_type = default_rule_type
# Guard: don't let a non-binding source produce 'binding' rule_type
if not is_binding and rule_type == "binding":
rule_type = "persuasive"
practice_areas_raw = raw.get("practice_areas") or []
if isinstance(practice_areas_raw, str):
practice_areas_raw = [practice_areas_raw]
practice_areas = [p for p in practice_areas_raw if p in _VALID_PRACTICE_AREAS]
subject_tags_raw = raw.get("subject_tags") or []
if isinstance(subject_tags_raw, str):
subject_tags_raw = [subject_tags_raw]
subject_tags = [str(t).strip() for t in subject_tags_raw if str(t).strip()]
cites_raw = raw.get("cites") or []
if isinstance(cites_raw, str):
cites_raw = [cites_raw]
cites = [str(c).strip() for c in cites_raw if str(c).strip()]
try:
confidence = float(raw.get("confidence", 0.0))
except (TypeError, ValueError):
confidence = 0.0
confidence = max(0.0, min(1.0, confidence))
return {
"rule_statement": rule_statement,
"rule_type": rule_type,
"reasoning_summary": (raw.get("reasoning_summary") or "").strip(),
"supporting_quote": supporting_quote,
"page_reference": (raw.get("page_reference") or "").strip(),
"practice_areas": practice_areas,
"subject_tags": subject_tags,
"cites": cites,
"confidence": confidence,
}
async def _extract_chunk(
chunk_text: str,
section_type: str,
chunk_index: int,
chunk_total: int,
context: str,
is_binding: bool,
effort: str | None = None,
) -> tuple[list[dict], bool]:
"""Run the halacha extractor on one chunk with retry.
Returns ``(halachot, succeeded)`` so the caller can distinguish "Claude
said there are no halachot here" (`(_, True)`) from "every attempt
crashed/timed out" (`(_, False)`). Without this distinction a precedent
that hit a rate-limit storm looks identical to one that genuinely has no
halachot — and gets silently marked `no_halachot`.
The prompt branches on ``is_binding`` so non-binding sources (other
appeals committees, district courts) yield application/persuasive
entries rather than a forced 0-result strict halacha pass.
"""
base_prompt = (
HALACHA_EXTRACTION_PROMPT_BINDING if is_binding
else HALACHA_EXTRACTION_PROMPT_PERSUASIVE
)
chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else ""
# Pass the static instruction prompt as `system` so the SDK path can cache
# it (5-min ephemeral). Only the per-chunk content varies via `prompt`.
user_msg = (
f"## הקלט\n"
f"סוג קטע: {section_type}\n"
f"{context}{chunk_label}\n\n"
f"--- תחילת הטקסט ---\n{chunk_text}\n--- סוף הטקסט ---"
)
last_err: Exception | None = None
for attempt in range(CHUNK_RETRY_ATTEMPTS + 1):
try:
result = await claude_session.query_json(
user_msg,
system=base_prompt,
model=config.HALACHA_EXTRACT_MODEL or None,
effort=(effort or config.HALACHA_EXTRACT_EFFORT) or None,
)
except Exception as e:
last_err = e
logger.warning(
"halacha_extractor chunk %d/%d attempt %d raised: %s",
chunk_index + 1, chunk_total, attempt + 1, e,
)
continue
if isinstance(result, list):
return result, True
logger.warning(
"halacha_extractor chunk %d/%d attempt %d returned non-list (%s)",
chunk_index + 1, chunk_total, attempt + 1, type(result).__name__,
)
logger.error(
"halacha_extractor chunk %d/%d failed after %d attempts: %s",
chunk_index + 1, chunk_total, CHUNK_RETRY_ATTEMPTS + 1, last_err,
)
return [], False
async def extract(case_law_id: UUID | str, force: bool = False,
effort: str | None = None) -> dict:
"""Extract halachot from an uploaded precedent — globally serialized.
``effort`` overrides the per-chunk LLM effort (default
``config.HALACHA_EXTRACT_EFFORT`` = xhigh). Bulk queue-drains pass the
lighter ``config.HALACHA_BULK_EXTRACT_EFFORT`` to cut wall-clock at scale.
``force=False`` (default) RESUMES: chunks already extracted (checkpointed)
are skipped, so a crash/interrupt never loses completed work or re-pays for
it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all
(used by explicit re-extraction).
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
across ALL processes (agent retries + batch ``process_pending`` spawn
independent OS drivers; an in-process Semaphore can't see them). If another
extraction already holds the lock this returns ``status='busy'`` and the
precedent stays pending for the next drain — no second xhigh run piles on
(this is the fix for the 2026-05-31 box freeze).
Returns:
``{"status": "...", "extracted": N, "verified": M, "stored": K, ...}``
"""
if isinstance(case_law_id, str):
case_law_id = UUID(case_law_id)
pool = await db.get_pool()
lock_conn = await pool.acquire()
try:
got = await lock_conn.fetchval(
"SELECT pg_try_advisory_lock($1)", _HALACHA_EXTRACT_LOCK_KEY,
)
if not got:
logger.warning(
"halacha extract: global lock held by another extraction — "
"skipping %s (stays pending for next drain)", case_law_id,
)
return {
"status": "busy", "extracted": 0, "stored": 0,
"case_law_id": str(case_law_id),
}
try:
return await _extract_impl(case_law_id, force=force, effort=effort)
finally:
await lock_conn.fetchval(
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
)
finally:
await pool.release(lock_conn)
async def _extract_impl(case_law_id: UUID, force: bool = False,
effort: str | None = None) -> dict:
"""Core extraction (caller holds the global advisory lock for the duration).
Crash-safe + resumable: each chunk's halachot are stored AND the chunk is
checkpointed (``precedent_chunks.halacha_extracted_at``) the moment it
finishes. A crash/interrupt loses at most the in-flight chunk; a re-run
resumes — already-done chunks are skipped, failed/pending chunks retried.
``force=True`` wipes prior halachot + checkpoints and re-extracts all.
"""
record = await db.get_case_law(case_law_id)
if not record:
return {"status": "not_found", "extracted": 0, "stored": 0}
is_binding = bool(record.get("is_binding"))
# Try the targeted sections first (legal_analysis / ruling / conclusion).
# If the chunker labeled everything as 'other' (common when a ruling
# uses non-standard headings or the section markers aren't bracketed
# cleanly), fall back to ALL chunks — better to over-include than to
# silently skip a ruling that has reasoning under an unexpected label.
chunks = await db.list_precedent_chunks(
case_law_id, section_types=EXTRACTABLE_SECTIONS,
)
if not chunks:
chunks = await db.list_precedent_chunks(case_law_id)
if chunks:
logger.info(
"halacha_extractor: case_law=%s — no targeted sections, "
"falling back to all %d chunks",
case_law_id, len(chunks),
)
if not chunks:
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "no_chunks", "extracted": 0, "stored": 0}
# force = clean slate; otherwise resume (skip already-checkpointed chunks).
if force:
await db.reset_halacha_extraction(case_law_id)
for c in chunks:
c["halacha_extracted_at"] = None
await db.set_case_law_halacha_status(case_law_id, "processing")
pending = [c for c in chunks if c.get("halacha_extracted_at") is None]
# Legacy guard: a precedent extracted before V25 has halachot but NO chunk
# checkpoints. Re-extracting (append-per-chunk) would DUPLICATE them. If
# nothing is checkpointed yet but halachot already exist, backfill the
# checkpoints and treat as complete instead of re-extracting.
if not force and len(pending) == len(chunks):
already = await db.list_halachot(case_law_id=case_law_id, limit=1)
if already:
await db.mark_all_chunks_extracted(case_law_id)
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
await db.set_case_law_halacha_status(case_law_id, "completed")
logger.info(
"halacha_extractor: case_law=%s legacy-backfill — %d existing "
"halachot, checkpoints backfilled (no re-extract).",
case_law_id, total,
)
return {"status": "completed", "extracted": total, "stored": total,
"legacy_backfill": True, "total_chunks": len(chunks)}
if not pending:
# Resume found nothing left — every chunk already extracted.
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
await db.set_case_law_halacha_status(case_law_id, "completed")
return {"status": "completed", "extracted": total, "stored": total,
"resumed": True, "total_chunks": len(chunks)}
full_text = record.get("full_text") or ""
citation = record.get("case_number", "")
court = record.get("court", "")
date_str = str(record.get("date") or "")
context = f"מקור: {citation}{court}, {date_str}"
idx_by_id = {c["id"]: i for i, c in enumerate(chunks)}
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
store_lock = asyncio.Lock() # serialize per-chunk stores (index continuity)
stored_total = 0
failed_chunks = 0
async def _process(chunk_row: dict) -> None:
nonlocal stored_total, failed_chunks
async with sem:
items, ok = await _extract_chunk(
chunk_row["content"], chunk_row["section_type"],
idx_by_id[chunk_row["id"]], len(chunks), context, is_binding,
effort,
)
if not ok:
failed_chunks += 1 # leave chunk un-checkpointed → retried on resume
return
cleaned: list[dict] = []
for raw in items:
coerced = _coerce_halacha(raw, is_binding=is_binding)
if coerced is None:
continue
coerced["quote_verified"] = _verify_quote(
coerced["supporting_quote"], full_text,
)
cleaned.append(coerced)
if cleaned:
embed_inputs = [
f"{h['rule_statement']}{h['reasoning_summary']}".strip("")
for h in cleaned
]
try:
vectors = await embeddings.embed_texts(embed_inputs, input_type="document")
except Exception as e:
logger.error("halacha_extractor: embeddings failed: %s", e)
vectors = [None] * len(cleaned)
for h, vec in zip(cleaned, vectors):
h["embedding"] = vec
# Store this chunk's halachot AND checkpoint the chunk, atomically.
async with store_lock:
stored_total += await db.store_halachot_for_chunk(
case_law_id, chunk_row["id"], cleaned,
)
await asyncio.gather(*[_process(c) for c in pending])
# Decide final status from what's LEFT (re-read checkpoints).
after = await db.list_precedent_chunks(case_law_id, section_types=EXTRACTABLE_SECTIONS)
if not after:
after = await db.list_precedent_chunks(case_law_id)
still_pending = sum(1 for c in after if c.get("halacha_extracted_at") is None)
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
if still_pending:
# Some chunks failed this run. Leave status 'processing' so a resume
# continues them (no progress is lost — done chunks are checkpointed).
if total == 0 and failed_chunks >= len(pending) * EXTRACTION_FAILURE_THRESHOLD:
logger.error(
"halacha_extractor: case_law=%s extraction_failed — %d/%d pending "
"chunks failed, 0 stored. status left 'processing' for retry.",
case_law_id, failed_chunks, len(pending),
)
return {"status": "extraction_failed", "extracted": 0, "stored": 0,
"failed_chunks": failed_chunks, "pending_chunks": still_pending,
"total_chunks": len(chunks)}
logger.warning(
"halacha_extractor: case_law=%s partial — %d chunks still pending, "
"%d halachot stored so far. status 'processing' (resume to finish).",
case_law_id, still_pending, total,
)
return {"status": "partial", "extracted": total, "stored": stored_total,
"pending_chunks": still_pending, "total_chunks": len(chunks)}
# All chunks done.
stored = total
verified = sum(1 for h in await db.list_halachot(case_law_id=case_law_id, limit=10_000)
if h.get("quote_verified"))
await db.set_case_law_halacha_status(case_law_id, "completed")
logger.info(
"halacha_extractor: case_law=%s completed — %d halachot stored "
"(%d new this run), %d quote-verified, %d chunks",
case_law_id, total, stored_total, verified, len(chunks),
)
return {
"status": "completed",
"extracted": total,
"verified": verified,
"stored": stored,
"stored_this_run": stored_total,
"total_chunks": len(chunks),
}