From 97ede1a49d3d858027711027f8e8d2d8ae15b470 Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 06:04:53 +0000 Subject: [PATCH] fix(extraction): self-heal stale halacha 'processing' rows + scheduled drainer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The halacha extraction queue was stuck (same class as the metadata issue): 26 precedents requested extraction with no drainer, plus 1 orphaned in 'processing' (status=processing, requested_at cleared → never re-picked by the queue). - db.requeue_stale_processing_extractions(kind): re-stamp orphaned 'processing' rows (requested_at IS NULL) so they re-drain; halacha extractor force=False resumes from chunk checkpoints (no duplicates). - process_pending_extractions calls it at the top — fully unattended, safe under the global advisory lock. Mirrors the digests-drain self-heal. - legal-halacha-drain.config.cjs: pm2 cron (every 2h, conservative — Claude is slow/rate-limited and each run adds to the chair's pending_review queue). drain_halacha_queue.py stays on claude_session (high reasoning quality for holding/ratio; NOT moved to Gemini). SCRIPTS.md. The chair-approval gate (INV-G10) is untouched — this only produces halachot; Daphna still approves each in /approvals. Co-Authored-By: Claude Opus 4.8 (1M context) --- mcp-server/src/legal_mcp/services/db.py | 28 +++++++++++++ .../legal_mcp/services/precedent_library.py | 10 +++++ scripts/SCRIPTS.md | 3 +- scripts/legal-halacha-drain.config.cjs | 39 +++++++++++++++++++ 4 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 scripts/legal-halacha-drain.config.cjs diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 6b908e5..3e775ff 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -5421,6 +5421,34 @@ async def list_pending_extraction_requests( return out +async def requeue_stale_processing_extractions(kind: str = "halacha") -> int: + """Re-stamp orphaned 'processing' rows so they re-drain. Returns count healed. + + A drain that died mid-extraction can leave a row ``status='processing'`` with + its ``requested_at`` already cleared — orphaned: the queue selects on + ``requested_at IS NOT NULL`` so it would never be picked again. We re-stamp + those (only when requested_at IS NULL, i.e. not an actively-processing row in + a concurrent run) so the next drain resumes them. + """ + status_col = ( + "metadata_extraction_status" if kind == "metadata" + else "halacha_extraction_status" + ) + req_col = ( + "metadata_extraction_requested_at" if kind == "metadata" + else "halacha_extraction_requested_at" + ) + pool = await get_pool() + tag = await pool.execute( + f"UPDATE case_law SET {req_col} = now(), {status_col} = 'pending' " + f"WHERE {status_col} = 'processing' AND {req_col} IS NULL" + ) + try: + return int(str(tag).split()[-1]) + except (ValueError, IndexError): + return 0 + + async def extraction_queue_status() -> dict: """Pending-extraction queue depth per kind (INV-TOOL4 visibility / GAP-45). diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index e5219ea..fdf2758 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -216,6 +216,16 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) - if kind not in {"metadata", "halacha"}: raise ValueError("kind must be 'metadata' or 'halacha'") + # Self-heal stale 'processing' rows (fully unattended): a drain that crashed + # mid-extraction can leave a row status='processing' with its requested_at + # cleared — orphaned, so it would never be re-picked. Re-stamp it so it + # re-drains (the halacha extractor uses force=False → resumes from chunk + # checkpoints, no duplicates). Safe under the global advisory lock (only one + # drain runs at a time). Mirrors the digests-drain self-heal. + healed = await db.requeue_stale_processing_extractions(kind=kind) + if healed: + logger.warning("self-healed %d stale '%s' processing row(s)", healed, kind) + pending = await db.list_pending_extraction_requests(kind=kind, limit=limit) if not pending: return {"status": "no_pending", "kind": kind, "processed": 0, "results": []} diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 86fc7a5..1da6530 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -94,7 +94,8 @@ | `run_curator_deepseek_test_v2.sh` | A/B test #2 (2026-05-05) — אותו run אבל עם interaction. תוצאה: 9:08 דק׳, 5 ממצאים, היחיד מ-4 הריצות שזיהה תוצאה עובדתית נכונה (קבלה חלקית). interaction נכשל ב-API ("Agent run id required" בריצה ידנית). | החלפת Curator לאדפטר DeepSeek מקומי | | `run_curator_sonnet_rerun.sh` | A/B test #3 (2026-05-05) — ריצה חוזרת של Sonnet 4.5 על אותו CMP-78. תוצאה: 12:52 דק׳ (לעומת 20:13 בריצה המקורית — כי בלי לולאת interaction.json). זיהה תוצאה שגויה ("דחייה") **בעקביות עם הריצה המקורית** — Sonnet עקבי-בטעות, DeepSeek אקראי. | בדיקה חד-פעמית — לא להריץ שוב | | `ingest_incoming_batch.py` | python | קליטת batch של החלטות ועדת ערר מ-`data/precedents/incoming/` דרך המסלול הקנוני (`ingest_internal_decision`) + חילוץ מטא-דאטה לכל תיק (המסלול הפנימי לא מתזמן metadata — INV-ING3). רצף (לא מקבילי, להימנע מעומס CLI). רשימת `DECISIONS` נערכת ידנית לכל batch. config מ-`~/.env`. תומך תהליך [[project_precedent_incoming_workflow]]. | ידני, per-batch (חלופה ל-MCP `internal_decision_upload` כש-batch גדול) | -| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). משמש אחרי `ingest_incoming_batch.py`. | ידני אחרי batch (חלופה ל-MCP `precedent_process_pending`) | +| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | +| `legal-halacha-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_halacha_queue.py`** (cron `47 */2 * * *`, `HALACHA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-ההלכות. קצב שמרני (Claude איטי + כל ריצה מוסיפה לתור-אישור-היו"ר). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | | `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. | **cron יומי** (10:00, אחרי ה-poll של n8n; flock למניעת חפיפה → `data/digests/drain.log`) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | diff --git a/scripts/legal-halacha-drain.config.cjs b/scripts/legal-halacha-drain.config.cjs new file mode 100644 index 0000000..b228aac --- /dev/null +++ b/scripts/legal-halacha-drain.config.cjs @@ -0,0 +1,39 @@ +/** + * pm2 ecosystem entry for legal-halacha-drain — scheduled drain of the precedent + * halacha-extraction queue. Halacha extraction stays on claude_session (local + * CLI, high reasoning quality for holding/ratio) — unlike metadata which moved + * to Gemini. Extracted halachot land 'pending_review' for the chair's approval + * gate (INV-G10); this drainer only produces them, it never approves. + * + * The drain self-heals orphaned 'processing' rows (precedent_library) and is + * serialised by a global advisory lock, so overlapping ticks are safe. + * + * Pattern: cron_restart fires the script; autorestart:false → one-shot per tick + * (pm2 shows "stopped" between ticks). Cheap no-op when the queue is empty. + * Cadence is conservative (every 2h) because Claude extraction is slow/rate- + * limited and each run adds to the chair's review queue. + * + * Requires the local ``claude`` CLI + host ~/.env (POSTGRES_URL, etc.). + * + * Install (once): + * pm2 start /home/chaim/legal-ai/scripts/legal-halacha-drain.config.cjs + * pm2 save + * Run now (manual): mcp-server/.venv/bin/python scripts/drain_halacha_queue.py + * Schedule override: HALACHA_DRAIN_CRON (default every 2 hours at :47). + */ +const cron = process.env.HALACHA_DRAIN_CRON || "47 */2 * * *"; + +module.exports = { + apps: [ + { + name: "legal-halacha-drain", + cwd: "/home/chaim/legal-ai", + script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python", + args: "scripts/drain_halacha_queue.py", + env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" }, + autorestart: false, // one-shot per cron tick + cron_restart: cron, + max_memory_restart: "800M", + }, + ], +};