fix(extraction): self-heal stale halacha 'processing' rows + scheduled drainer
The halacha extraction queue was stuck (same class as the metadata issue): 26 precedents requested extraction with no drainer, plus 1 orphaned in 'processing' (status=processing, requested_at cleared → never re-picked by the queue). - db.requeue_stale_processing_extractions(kind): re-stamp orphaned 'processing' rows (requested_at IS NULL) so they re-drain; halacha extractor force=False resumes from chunk checkpoints (no duplicates). - process_pending_extractions calls it at the top — fully unattended, safe under the global advisory lock. Mirrors the digests-drain self-heal. - legal-halacha-drain.config.cjs: pm2 cron (every 2h, conservative — Claude is slow/rate-limited and each run adds to the chair's pending_review queue). drain_halacha_queue.py stays on claude_session (high reasoning quality for holding/ratio; NOT moved to Gemini). SCRIPTS.md. The chair-approval gate (INV-G10) is untouched — this only produces halachot; Daphna still approves each in /approvals. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5421,6 +5421,34 @@ async def list_pending_extraction_requests(
|
||||
return out
|
||||
|
||||
|
||||
async def requeue_stale_processing_extractions(kind: str = "halacha") -> int:
|
||||
"""Re-stamp orphaned 'processing' rows so they re-drain. Returns count healed.
|
||||
|
||||
A drain that died mid-extraction can leave a row ``status='processing'`` with
|
||||
its ``requested_at`` already cleared — orphaned: the queue selects on
|
||||
``requested_at IS NOT NULL`` so it would never be picked again. We re-stamp
|
||||
those (only when requested_at IS NULL, i.e. not an actively-processing row in
|
||||
a concurrent run) so the next drain resumes them.
|
||||
"""
|
||||
status_col = (
|
||||
"metadata_extraction_status" if kind == "metadata"
|
||||
else "halacha_extraction_status"
|
||||
)
|
||||
req_col = (
|
||||
"metadata_extraction_requested_at" if kind == "metadata"
|
||||
else "halacha_extraction_requested_at"
|
||||
)
|
||||
pool = await get_pool()
|
||||
tag = await pool.execute(
|
||||
f"UPDATE case_law SET {req_col} = now(), {status_col} = 'pending' "
|
||||
f"WHERE {status_col} = 'processing' AND {req_col} IS NULL"
|
||||
)
|
||||
try:
|
||||
return int(str(tag).split()[-1])
|
||||
except (ValueError, IndexError):
|
||||
return 0
|
||||
|
||||
|
||||
async def extraction_queue_status() -> dict:
|
||||
"""Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45).
|
||||
|
||||
|
||||
@@ -216,6 +216,16 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -
|
||||
if kind not in {"metadata", "halacha"}:
|
||||
raise ValueError("kind must be 'metadata' or 'halacha'")
|
||||
|
||||
# Self-heal stale 'processing' rows (fully unattended): a drain that crashed
|
||||
# mid-extraction can leave a row status='processing' with its requested_at
|
||||
# cleared — orphaned, so it would never be re-picked. Re-stamp it so it
|
||||
# re-drains (the halacha extractor uses force=False → resumes from chunk
|
||||
# checkpoints, no duplicates). Safe under the global advisory lock (only one
|
||||
# drain runs at a time). Mirrors the digests-drain self-heal.
|
||||
healed = await db.requeue_stale_processing_extractions(kind=kind)
|
||||
if healed:
|
||||
logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind)
|
||||
|
||||
pending = await db.list_pending_extraction_requests(kind=kind, limit=limit)
|
||||
if not pending:
|
||||
return {"status": "no_pending", "kind": kind, "processed": 0, "results": []}
|
||||
|
||||
Reference in New Issue
Block a user