Merge pull request 'feat(halacha): חילוץ מצטבר crash-safe + resume (A + resume)' (#31) from feat/halacha-incremental-resume into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 3m5s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 3m5s
This commit was merged in pull request #31.
This commit is contained in:
@@ -1149,6 +1149,14 @@ CREATE TABLE IF NOT EXISTS halacha_citation_corroboration (
|
|||||||
CREATE INDEX IF NOT EXISTS idx_hcc_halacha ON halacha_citation_corroboration(halacha_id);
|
CREATE INDEX IF NOT EXISTS idx_hcc_halacha ON halacha_citation_corroboration(halacha_id);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
SCHEMA_V25_SQL = """
|
||||||
|
-- Crash-safe halacha extraction: per-chunk checkpoint enables incremental store
|
||||||
|
-- + resume. A chunk with halacha_extracted_at set has been processed; a resumed
|
||||||
|
-- run skips it (so a crash never loses completed chunks or re-pays for them).
|
||||||
|
ALTER TABLE precedent_chunks
|
||||||
|
ADD COLUMN IF NOT EXISTS halacha_extracted_at TIMESTAMPTZ;
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
@@ -1177,7 +1185,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
|||||||
await conn.execute(SCHEMA_V22_SQL)
|
await conn.execute(SCHEMA_V22_SQL)
|
||||||
await conn.execute(SCHEMA_V23_SQL)
|
await conn.execute(SCHEMA_V23_SQL)
|
||||||
await conn.execute(SCHEMA_V24_SQL)
|
await conn.execute(SCHEMA_V24_SQL)
|
||||||
logger.info("Database schema initialized (v1-v24)")
|
await conn.execute(SCHEMA_V25_SQL)
|
||||||
|
logger.info("Database schema initialized (v1-v25)")
|
||||||
|
|
||||||
|
|
||||||
async def init_schema() -> None:
|
async def init_schema() -> None:
|
||||||
@@ -3199,7 +3208,8 @@ async def list_precedent_chunks(
|
|||||||
pool = await get_pool()
|
pool = await get_pool()
|
||||||
if section_types:
|
if section_types:
|
||||||
rows = await pool.fetch(
|
rows = await pool.fetch(
|
||||||
"""SELECT id, chunk_index, content, section_type, page_number
|
"""SELECT id, chunk_index, content, section_type, page_number,
|
||||||
|
halacha_extracted_at
|
||||||
FROM precedent_chunks
|
FROM precedent_chunks
|
||||||
WHERE case_law_id = $1 AND section_type = ANY($2::text[])
|
WHERE case_law_id = $1 AND section_type = ANY($2::text[])
|
||||||
ORDER BY chunk_index""",
|
ORDER BY chunk_index""",
|
||||||
@@ -3207,7 +3217,8 @@ async def list_precedent_chunks(
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
rows = await pool.fetch(
|
rows = await pool.fetch(
|
||||||
"""SELECT id, chunk_index, content, section_type, page_number
|
"""SELECT id, chunk_index, content, section_type, page_number,
|
||||||
|
halacha_extracted_at
|
||||||
FROM precedent_chunks
|
FROM precedent_chunks
|
||||||
WHERE case_law_id = $1
|
WHERE case_law_id = $1
|
||||||
ORDER BY chunk_index""",
|
ORDER BY chunk_index""",
|
||||||
@@ -3280,6 +3291,89 @@ async def store_halachot(case_law_id: UUID, halachot: list[dict]) -> int:
|
|||||||
return len(halachot)
|
return len(halachot)
|
||||||
|
|
||||||
|
|
||||||
|
async def reset_halacha_extraction(case_law_id: UUID) -> None:
|
||||||
|
"""Force a clean re-extraction: wipe halachot + clear per-chunk checkpoints
|
||||||
|
so every chunk is re-processed (used by explicit re-extract, not resume)."""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.transaction():
|
||||||
|
await conn.execute("DELETE FROM halachot WHERE case_law_id = $1", case_law_id)
|
||||||
|
await conn.execute(
|
||||||
|
"UPDATE precedent_chunks SET halacha_extracted_at = NULL "
|
||||||
|
"WHERE case_law_id = $1", case_law_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def mark_all_chunks_extracted(case_law_id: UUID) -> int:
|
||||||
|
"""Checkpoint every un-marked chunk of a precedent as extracted.
|
||||||
|
|
||||||
|
Used to backfill pre-V25 precedents (halachot already exist but no chunk was
|
||||||
|
checkpointed) so a resume run skips them instead of re-extracting (which
|
||||||
|
would duplicate). Returns rows updated.
|
||||||
|
"""
|
||||||
|
pool = await get_pool()
|
||||||
|
result = await pool.execute(
|
||||||
|
"UPDATE precedent_chunks SET halacha_extracted_at = now() "
|
||||||
|
"WHERE case_law_id = $1 AND halacha_extracted_at IS NULL", case_law_id,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
return int(result.split()[-1])
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def store_halachot_for_chunk(
|
||||||
|
case_law_id: UUID, chunk_id: UUID, halachot: list[dict],
|
||||||
|
) -> int:
|
||||||
|
"""Persist ONE chunk's halachot and mark the chunk done — atomically.
|
||||||
|
|
||||||
|
Crash-safe + resumable: each chunk's results land in the DB the moment it
|
||||||
|
finishes, and the chunk is flagged (``halacha_extracted_at``) so a resumed
|
||||||
|
run skips it. ``halacha_index`` continues from the current max so appends
|
||||||
|
across chunks never collide. The chunk is marked even when ``halachot`` is
|
||||||
|
empty (so resume skips genuinely-empty chunks too). Caller serializes calls
|
||||||
|
(a single in-process store-lock) so the MAX read stays race-free.
|
||||||
|
"""
|
||||||
|
threshold = config.HALACHA_AUTO_APPROVE_THRESHOLD
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.transaction():
|
||||||
|
base = await conn.fetchval(
|
||||||
|
"SELECT COALESCE(MAX(halacha_index), -1) + 1 FROM halachot "
|
||||||
|
"WHERE case_law_id = $1", case_law_id,
|
||||||
|
)
|
||||||
|
for j, h in enumerate(halachot):
|
||||||
|
confidence = float(h.get("confidence", 0.0))
|
||||||
|
auto_approve = confidence >= threshold
|
||||||
|
review_status = "approved" if auto_approve else "pending_review"
|
||||||
|
reviewer = (
|
||||||
|
f"auto-approved (confidence ≥ {threshold:.2f})"
|
||||||
|
if auto_approve else None
|
||||||
|
)
|
||||||
|
reviewed_at_clause = "now()" if auto_approve else "NULL"
|
||||||
|
await conn.execute(
|
||||||
|
f"""INSERT INTO halachot
|
||||||
|
(case_law_id, halacha_index, rule_statement, rule_type,
|
||||||
|
reasoning_summary, supporting_quote, page_reference,
|
||||||
|
practice_areas, subject_tags, cites, confidence,
|
||||||
|
quote_verified, embedding, review_status,
|
||||||
|
reviewer, reviewed_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
|
||||||
|
$12, $13, $14, $15, {reviewed_at_clause})""",
|
||||||
|
case_law_id, base + j, h["rule_statement"],
|
||||||
|
h.get("rule_type", "binding"), h.get("reasoning_summary", ""),
|
||||||
|
h["supporting_quote"], h.get("page_reference", ""),
|
||||||
|
h.get("practice_areas", []), h.get("subject_tags", []),
|
||||||
|
h.get("cites", []), confidence, h.get("quote_verified", False),
|
||||||
|
h.get("embedding"), review_status, reviewer,
|
||||||
|
)
|
||||||
|
await conn.execute(
|
||||||
|
"UPDATE precedent_chunks SET halacha_extracted_at = now() "
|
||||||
|
"WHERE id = $1", chunk_id,
|
||||||
|
)
|
||||||
|
return len(halachot)
|
||||||
|
|
||||||
|
|
||||||
async def list_halachot(
|
async def list_halachot(
|
||||||
case_law_id: UUID | None = None,
|
case_law_id: UUID | None = None,
|
||||||
review_status: str | None = None,
|
review_status: str | None = None,
|
||||||
|
|||||||
@@ -342,9 +342,14 @@ async def _extract_chunk(
|
|||||||
return [], False
|
return [], False
|
||||||
|
|
||||||
|
|
||||||
async def extract(case_law_id: UUID | str) -> dict:
|
async def extract(case_law_id: UUID | str, force: bool = False) -> dict:
|
||||||
"""Extract halachot from an uploaded precedent — globally serialized.
|
"""Extract halachot from an uploaded precedent — globally serialized.
|
||||||
|
|
||||||
|
``force=False`` (default) RESUMES: chunks already extracted (checkpointed)
|
||||||
|
are skipped, so a crash/interrupt never loses completed work or re-pays for
|
||||||
|
it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all
|
||||||
|
(used by explicit re-extraction).
|
||||||
|
|
||||||
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
|
Takes a PostgreSQL advisory lock so only ONE extraction runs at a time
|
||||||
across ALL processes (agent retries + batch ``process_pending`` spawn
|
across ALL processes (agent retries + batch ``process_pending`` spawn
|
||||||
independent OS drivers; an in-process Semaphore can't see them). If another
|
independent OS drivers; an in-process Semaphore can't see them). If another
|
||||||
@@ -374,7 +379,7 @@ async def extract(case_law_id: UUID | str) -> dict:
|
|||||||
"case_law_id": str(case_law_id),
|
"case_law_id": str(case_law_id),
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
return await _extract_impl(case_law_id)
|
return await _extract_impl(case_law_id, force=force)
|
||||||
finally:
|
finally:
|
||||||
await lock_conn.fetchval(
|
await lock_conn.fetchval(
|
||||||
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||||
@@ -383,11 +388,14 @@ async def extract(case_law_id: UUID | str) -> dict:
|
|||||||
await pool.release(lock_conn)
|
await pool.release(lock_conn)
|
||||||
|
|
||||||
|
|
||||||
async def _extract_impl(case_law_id: UUID) -> dict:
|
async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict:
|
||||||
"""Core extraction (caller holds the global advisory lock for the duration).
|
"""Core extraction (caller holds the global advisory lock for the duration).
|
||||||
|
|
||||||
Idempotent: replaces any existing halachot for this case_law_id.
|
Crash-safe + resumable: each chunk's halachot are stored AND the chunk is
|
||||||
All inserted rows start as ``review_status='pending_review'``.
|
checkpointed (``precedent_chunks.halacha_extracted_at``) the moment it
|
||||||
|
finishes. A crash/interrupt loses at most the in-flight chunk; a re-run
|
||||||
|
resumes — already-done chunks are skipped, failed/pending chunks retried.
|
||||||
|
``force=True`` wipes prior halachot + checkpoints and re-extracts all.
|
||||||
"""
|
"""
|
||||||
record = await db.get_case_law(case_law_id)
|
record = await db.get_case_law(case_law_id)
|
||||||
if not record:
|
if not record:
|
||||||
@@ -415,111 +423,135 @@ async def _extract_impl(case_law_id: UUID) -> dict:
|
|||||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||||
return {"status": "no_chunks", "extracted": 0, "stored": 0}
|
return {"status": "no_chunks", "extracted": 0, "stored": 0}
|
||||||
|
|
||||||
await db.set_case_law_halacha_status(case_law_id, "processing")
|
# force = clean slate; otherwise resume (skip already-checkpointed chunks).
|
||||||
await db.delete_halachot(case_law_id)
|
if force:
|
||||||
|
await db.reset_halacha_extraction(case_law_id)
|
||||||
|
for c in chunks:
|
||||||
|
c["halacha_extracted_at"] = None
|
||||||
|
|
||||||
|
await db.set_case_law_halacha_status(case_law_id, "processing")
|
||||||
|
|
||||||
|
pending = [c for c in chunks if c.get("halacha_extracted_at") is None]
|
||||||
|
|
||||||
|
# Legacy guard: a precedent extracted before V25 has halachot but NO chunk
|
||||||
|
# checkpoints. Re-extracting (append-per-chunk) would DUPLICATE them. If
|
||||||
|
# nothing is checkpointed yet but halachot already exist, backfill the
|
||||||
|
# checkpoints and treat as complete instead of re-extracting.
|
||||||
|
if not force and len(pending) == len(chunks):
|
||||||
|
already = await db.list_halachot(case_law_id=case_law_id, limit=1)
|
||||||
|
if already:
|
||||||
|
await db.mark_all_chunks_extracted(case_law_id)
|
||||||
|
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
|
||||||
|
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||||
|
logger.info(
|
||||||
|
"halacha_extractor: case_law=%s legacy-backfill — %d existing "
|
||||||
|
"halachot, checkpoints backfilled (no re-extract).",
|
||||||
|
case_law_id, total,
|
||||||
|
)
|
||||||
|
return {"status": "completed", "extracted": total, "stored": total,
|
||||||
|
"legacy_backfill": True, "total_chunks": len(chunks)}
|
||||||
|
|
||||||
|
if not pending:
|
||||||
|
# Resume found nothing left — every chunk already extracted.
|
||||||
|
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
|
||||||
|
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||||
|
return {"status": "completed", "extracted": total, "stored": total,
|
||||||
|
"resumed": True, "total_chunks": len(chunks)}
|
||||||
|
|
||||||
|
full_text = record.get("full_text") or ""
|
||||||
citation = record.get("case_number", "")
|
citation = record.get("case_number", "")
|
||||||
court = record.get("court", "")
|
court = record.get("court", "")
|
||||||
date_str = str(record.get("date") or "")
|
date_str = str(record.get("date") or "")
|
||||||
context = f"מקור: {citation} — {court}, {date_str}"
|
context = f"מקור: {citation} — {court}, {date_str}"
|
||||||
|
idx_by_id = {c["id"]: i for i, c in enumerate(chunks)}
|
||||||
|
|
||||||
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
|
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
|
||||||
|
store_lock = asyncio.Lock() # serialize per-chunk stores (index continuity)
|
||||||
|
stored_total = 0
|
||||||
|
failed_chunks = 0
|
||||||
|
|
||||||
async def _bounded(idx: int, chunk_row: dict) -> tuple[list[dict], bool]:
|
async def _process(chunk_row: dict) -> None:
|
||||||
|
nonlocal stored_total, failed_chunks
|
||||||
async with sem:
|
async with sem:
|
||||||
return await _extract_chunk(
|
items, ok = await _extract_chunk(
|
||||||
chunk_row["content"], chunk_row["section_type"],
|
chunk_row["content"], chunk_row["section_type"],
|
||||||
idx, len(chunks), context, is_binding,
|
idx_by_id[chunk_row["id"]], len(chunks), context, is_binding,
|
||||||
|
)
|
||||||
|
if not ok:
|
||||||
|
failed_chunks += 1 # leave chunk un-checkpointed → retried on resume
|
||||||
|
return
|
||||||
|
cleaned: list[dict] = []
|
||||||
|
for raw in items:
|
||||||
|
coerced = _coerce_halacha(raw, is_binding=is_binding)
|
||||||
|
if coerced is None:
|
||||||
|
continue
|
||||||
|
coerced["quote_verified"] = _verify_quote(
|
||||||
|
coerced["supporting_quote"], full_text,
|
||||||
|
)
|
||||||
|
cleaned.append(coerced)
|
||||||
|
if cleaned:
|
||||||
|
embed_inputs = [
|
||||||
|
f"{h['rule_statement']} — {h['reasoning_summary']}".strip(" —")
|
||||||
|
for h in cleaned
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
vectors = await embeddings.embed_texts(embed_inputs, input_type="document")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("halacha_extractor: embeddings failed: %s", e)
|
||||||
|
vectors = [None] * len(cleaned)
|
||||||
|
for h, vec in zip(cleaned, vectors):
|
||||||
|
h["embedding"] = vec
|
||||||
|
# Store this chunk's halachot AND checkpoint the chunk, atomically.
|
||||||
|
async with store_lock:
|
||||||
|
stored_total += await db.store_halachot_for_chunk(
|
||||||
|
case_law_id, chunk_row["id"], cleaned,
|
||||||
)
|
)
|
||||||
|
|
||||||
chunk_results = await asyncio.gather(
|
await asyncio.gather(*[_process(c) for c in pending])
|
||||||
*[_bounded(i, c) for i, c in enumerate(chunks)]
|
|
||||||
)
|
|
||||||
raw_halachot: list[dict] = []
|
|
||||||
failed_chunks = 0
|
|
||||||
for items, ok in chunk_results:
|
|
||||||
raw_halachot.extend(items)
|
|
||||||
if not ok:
|
|
||||||
failed_chunks += 1
|
|
||||||
|
|
||||||
# If most chunks failed (rate limit storm, claude_session crash, etc.)
|
# Decide final status from what's LEFT (re-read checkpoints).
|
||||||
# do NOT touch the DB status — leave it 'processing' so the caller can
|
after = await db.list_precedent_chunks(case_law_id, section_types=EXTRACTABLE_SECTIONS)
|
||||||
# retry without the request falling out of the queue. The caller
|
if not after:
|
||||||
# (`process_pending_extractions`) is responsible for either retrying or
|
after = await db.list_precedent_chunks(case_law_id)
|
||||||
# finalising the status as 'failed' after retries are exhausted. This
|
still_pending = sum(1 for c in after if c.get("halacha_extracted_at") is None)
|
||||||
# is the bug that produced 317/10's silent `no_halachot` after a
|
total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000))
|
||||||
# 129-chunk neighbour saturated the API.
|
|
||||||
failure_rate = failed_chunks / len(chunks) if chunks else 0
|
if still_pending:
|
||||||
if failure_rate >= EXTRACTION_FAILURE_THRESHOLD and not raw_halachot:
|
# Some chunks failed this run. Leave status 'processing' so a resume
|
||||||
logger.error(
|
# continues them (no progress is lost — done chunks are checkpointed).
|
||||||
"halacha_extractor: case_law=%s extraction_failed — "
|
if total == 0 and failed_chunks >= len(pending) * EXTRACTION_FAILURE_THRESHOLD:
|
||||||
"%d/%d chunks failed (rate=%.0f%%), no halachot retrieved. "
|
logger.error(
|
||||||
"DB status left as 'processing' for caller-level retry.",
|
"halacha_extractor: case_law=%s extraction_failed — %d/%d pending "
|
||||||
case_law_id, failed_chunks, len(chunks), failure_rate * 100,
|
"chunks failed, 0 stored. status left 'processing' for retry.",
|
||||||
|
case_law_id, failed_chunks, len(pending),
|
||||||
|
)
|
||||||
|
return {"status": "extraction_failed", "extracted": 0, "stored": 0,
|
||||||
|
"failed_chunks": failed_chunks, "pending_chunks": still_pending,
|
||||||
|
"total_chunks": len(chunks)}
|
||||||
|
logger.warning(
|
||||||
|
"halacha_extractor: case_law=%s partial — %d chunks still pending, "
|
||||||
|
"%d halachot stored so far. status 'processing' (resume to finish).",
|
||||||
|
case_law_id, still_pending, total,
|
||||||
)
|
)
|
||||||
return {
|
return {"status": "partial", "extracted": total, "stored": stored_total,
|
||||||
"status": "extraction_failed",
|
"pending_chunks": still_pending, "total_chunks": len(chunks)}
|
||||||
"extracted": 0,
|
|
||||||
"stored": 0,
|
|
||||||
"failed_chunks": failed_chunks,
|
|
||||||
"total_chunks": len(chunks),
|
|
||||||
}
|
|
||||||
|
|
||||||
if not raw_halachot:
|
# All chunks done.
|
||||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
stored = total
|
||||||
return {
|
verified = sum(1 for h in await db.list_halachot(case_law_id=case_law_id, limit=10_000)
|
||||||
"status": "no_halachot",
|
if h.get("quote_verified"))
|
||||||
"extracted": 0,
|
|
||||||
"stored": 0,
|
|
||||||
"failed_chunks": failed_chunks,
|
|
||||||
"total_chunks": len(chunks),
|
|
||||||
}
|
|
||||||
|
|
||||||
# Validate against the full text of the precedent for the quote check.
|
|
||||||
full_text = record.get("full_text") or ""
|
|
||||||
|
|
||||||
cleaned: list[dict] = []
|
|
||||||
for raw in raw_halachot:
|
|
||||||
coerced = _coerce_halacha(raw, is_binding=is_binding)
|
|
||||||
if coerced is None:
|
|
||||||
continue
|
|
||||||
coerced["quote_verified"] = _verify_quote(
|
|
||||||
coerced["supporting_quote"], full_text,
|
|
||||||
)
|
|
||||||
cleaned.append(coerced)
|
|
||||||
|
|
||||||
if not cleaned:
|
|
||||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
|
||||||
return {"status": "no_valid_halachot", "extracted": len(raw_halachot), "stored": 0}
|
|
||||||
|
|
||||||
# Embed rule_statement + reasoning_summary so semantic search hits the
|
|
||||||
# rule directly rather than the surrounding chunk centroid.
|
|
||||||
embed_inputs = [
|
|
||||||
f"{h['rule_statement']} — {h['reasoning_summary']}".strip(" —")
|
|
||||||
for h in cleaned
|
|
||||||
]
|
|
||||||
try:
|
|
||||||
vectors = await embeddings.embed_texts(embed_inputs, input_type="document")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("halacha_extractor: embeddings failed: %s", e)
|
|
||||||
vectors = [None] * len(cleaned)
|
|
||||||
|
|
||||||
for halacha, vec in zip(cleaned, vectors):
|
|
||||||
halacha["embedding"] = vec
|
|
||||||
|
|
||||||
stored = await db.store_halachot(case_law_id, cleaned)
|
|
||||||
|
|
||||||
verified = sum(1 for h in cleaned if h["quote_verified"])
|
|
||||||
await db.set_case_law_halacha_status(case_law_id, "completed")
|
await db.set_case_law_halacha_status(case_law_id, "completed")
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"halacha_extractor: case_law=%s extracted=%d cleaned=%d verified=%d stored=%d",
|
"halacha_extractor: case_law=%s completed — %d halachot stored "
|
||||||
case_law_id, len(raw_halachot), len(cleaned), verified, stored,
|
"(%d new this run), %d quote-verified, %d chunks",
|
||||||
|
case_law_id, total, stored_total, verified, len(chunks),
|
||||||
)
|
)
|
||||||
return {
|
return {
|
||||||
"status": "completed",
|
"status": "completed",
|
||||||
"extracted": len(raw_halachot),
|
"extracted": total,
|
||||||
"valid": len(cleaned),
|
|
||||||
"verified": verified,
|
"verified": verified,
|
||||||
"stored": stored,
|
"stored": stored,
|
||||||
|
"stored_this_run": stored_total,
|
||||||
|
"total_chunks": len(chunks),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -156,7 +156,10 @@ async def reextract_halachot(
|
|||||||
# bad data. See note in db.request_metadata_extraction.
|
# bad data. See note in db.request_metadata_extraction.
|
||||||
|
|
||||||
await progress("extracting_halachot", 50, "מחלץ הלכות מחדש")
|
await progress("extracting_halachot", 50, "מחלץ הלכות מחדש")
|
||||||
result = await halacha_extractor.extract(case_law_id)
|
# Explicit re-extraction = clean slate (force): wipe prior halachot +
|
||||||
|
# per-chunk checkpoints and redo all. (Queue draining / resume uses the
|
||||||
|
# default force=False so an interrupted run continues where it stopped.)
|
||||||
|
result = await halacha_extractor.extract(case_law_id, force=True)
|
||||||
# Clear the queue timestamp on completion so the UI badge / worker queue
|
# Clear the queue timestamp on completion so the UI badge / worker queue
|
||||||
# don't keep showing this row. The queue worker (process_pending_extractions)
|
# don't keep showing this row. The queue worker (process_pending_extractions)
|
||||||
# already does this; mirror it here so per-record extraction drains too.
|
# already does this; mirror it here so per-record extraction drains too.
|
||||||
|
|||||||
Reference in New Issue
Block a user