feat(X13): auto-trigger court fetch from digests + drain tool

סוגר את הלולאה — יומון שמצביע על פס"ד בית-משפט שלא בקורפוס מזניק אחזור
אוטומטי, וקושר את היומון חזרה אחרי הקליטה (INV-DIG3 + INV-CF2).

- digest_library.try_autolink: בכשל-קישור, אם הציטוט מסווג כפס"ד-בימ"ש
  (supreme/admin) → _enqueue_court_fetch יוצר court_fetch_jobs(pending);
  ועדת-ערר (skip) לא מוזנק. never-raises (לא שובר קליטת-יומון).
- orchestrator.drain_pending(limit): מנקז pending/failed סדרתי (cooldown,
  INV-CF4), fetch+ingest לכל אחד; בהצלחה מקשר את היומון ל-case_law שנקלט.
- כלי-MCP court_fetch_drain + רישום ב-server.py.
- X13 spec: עודכן (הפער ב-INV-CF2 סומן כמתוקן).

נבדק מול ה-DB: עת"מ 46111-12-22 → job tier=admin pending digest-linked;
ערר 1110/20 → לא מוזנק. כלי מקומי בלבד (ingest = claude CLI).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 20:04:12 +00:00
parent 635dc98492
commit f56309da5a
5 changed files with 94 additions and 3 deletions

View File

@@ -72,7 +72,7 @@ underlying_citation → [classifier] → tier ∈ {supreme, admin, skip}
לא נזרק בשקט. `except: pass` אסור. לא נזרק בשקט. `except: pass` אסור.
**מקור-סמכות:** פרויקטלי-תפעולי — מיישם את [G4](00-constitution.md#inv-g4) וכלל-ההנדסה "אין בליעה שקטה" (§6). **מקור-סמכות:** פרויקטלי-תפעולי — מיישם את [G4](00-constitution.md#inv-g4) וכלל-ההנדסה "אין בליעה שקטה" (§6).
**אכיפה:** טבלת `court_fetch_jobs` (status+error+attempts) + לוג-warning בכל כישלון + Tier-2 gate. **אכיפה:** טבלת `court_fetch_jobs` (status+error+attempts) + לוג-warning בכל כישלון + Tier-2 gate.
**הפרה ידועה:** הפער הקיים ב-X12 — `try_autolink` שנכשל מחזיר `None` בשקט (יתוקן ע"י טריגר זה). **הפרה ידועה:** ~~הפער ב-X12 — `try_autolink` שנכשל מחזיר `None` בשקט~~**תוקן**: `try_autolink` שנכשל על ציטוט פס"ד-בימ"ש מזניק job ל-`court_fetch_jobs` (status=pending); `court_fetch_drain` מנקז (סדרתי) ומקשר את היומון חזרה בהצלחה.
### INV-CF3: אוטונומי-first, שער-אנושי חובה ב-fallback ### INV-CF3: אוטונומי-first, שער-אנושי חובה ב-fallback
**כלל:** האחזור מנסה אוטונומית; אך כש-N נסיונות נכשלים, **שער-אנושי** (VNC לפתרון-CAPTCHA **כלל:** האחזור מנסה אוטונומית; אך כש-N נסיונות נכשלים, **שער-אנושי** (VNC לפתרון-CAPTCHA
@@ -151,8 +151,8 @@ Service / responsible automation) | סטטוס: verified
| proxy בקונטיינר | `web/court_fetch_proxy.py` | שכפול `web/chat_proxy.py` | | proxy בקונטיינר | `web/court_fetch_proxy.py` | שכפול `web/chat_proxy.py` |
| pm2 | `scripts/legal-court-fetch-service.config.cjs` | שכפול `legal-chat-service.config.cjs` | | pm2 | `scripts/legal-court-fetch-service.config.cjs` | שכפול `legal-chat-service.config.cjs` |
| אורקסטרטור+תור | `services/court_fetch_orchestrator.py` + `db.py` (SCHEMA_Vxx) | דפוס-תור קיים | | אורקסטרטור+תור | `services/court_fetch_orchestrator.py` + `db.py` (SCHEMA_Vxx) | דפוס-תור קיים |
| כלי-MCP | `tools/court_fetch.py` (`court_verdict_fetch`) | חוזה-envelope [X9](X9-mcp-tool-contract.md) | | כלי-MCP | `tools/court_fetch.py` (`court_verdict_fetch` / `court_fetch_status` / `court_fetch_drain`) | חוזה-envelope [X9](X9-mcp-tool-contract.md) |
| טריגר | `services/digest_library.py` (`try_autolink` fail-path) | X12 | | טריגר אוטומטי | `services/digest_library.py` (`try_autolink` fail`_enqueue_court_fetch`) → drain ע"י `orchestrator.drain_pending` | X12 |
| סוד | `COURT_FETCH_SHARED_SECRET` (Infisical + Coolify) | דפוס `LEGAL_CHAT_SHARED_SECRET`, [X10](X10-deploy-env-secrets.md) | | סוד | `COURT_FETCH_SHARED_SECRET` (Infisical + Coolify) | דפוס `LEGAL_CHAT_SHARED_SECRET`, [X10](X10-deploy-env-secrets.md) |
--- ---

View File

@@ -988,6 +988,12 @@ async def court_fetch_status(case_number: str = "", status_filter: str = "") ->
return await cf_tools.court_fetch_status(case_number, status_filter) return await cf_tools.court_fetch_status(case_number, status_filter)
@mcp.tool()
async def court_fetch_drain(limit: int = 10) -> str:
"""ריקון תור-אחזור הפסיקה — מוריד וקולט jobs ממתינים שהיומונים מילאו, וקושר חזרה ליומון. מקומי בלבד."""
return await cf_tools.court_fetch_drain(limit)
# ── Internal citations graph (TaskMaster #34) ───────────────────── # ── Internal citations graph (TaskMaster #34) ─────────────────────

View File

@@ -204,10 +204,56 @@ async def fetch_and_ingest(
case_law_id=UUID(str(case_law_id)) if case_law_id else None, case_law_id=UUID(str(case_law_id)) if case_law_id else None,
source_url=source_url, error="", source_url=source_url, error="",
) )
# Close the digest gap (INV-DIG3): if this fetch traces back to a digest,
# link it to the freshly-ingested ruling. Best-effort; never fails the job.
link_digest_id = digest_id or job.get("digest_id")
if case_law_id and link_digest_id:
try:
await db.link_digest_to_case_law(link_digest_id, UUID(str(case_law_id)))
logger.info("linked digest %s → case_law %s", link_digest_id, case_law_id)
except Exception:
logger.warning("could not relink digest %s after fetch", link_digest_id)
return {"status": "done", "tier": cit.tier, "case_law_id": case_law_id, return {"status": "done", "tier": cit.tier, "case_law_id": case_law_id,
"citation": citation, "source_url": source_url, "ingest": result} "citation": citation, "source_url": source_url, "ingest": result}
# Politeness between consecutive court fetches in a drain (INV-CF4) — serial,
# spaced. Mirrors the precedent-extraction queue cadence.
_INTER_FETCH_COOLDOWN_S = float(os.environ.get("COURT_FETCH_DRAIN_COOLDOWN_S", "20"))
async def drain_pending(limit: int = 10) -> dict:
"""Process queued court-fetch jobs (status pending/failed) serially.
Drains the ``court_fetch_jobs`` queue the digest trigger fills — fetch +
ingest each, link back to its digest. Serial with a cooldown (INV-CF4); a
job that fails is recorded and retried next drain until it escalates to
``manual`` (INV-CF3). Local-only (runs the ingest pipeline / claude CLI).
"""
import asyncio
jobs = await db.court_fetch_job_list(status="pending", limit=limit)
jobs += await db.court_fetch_job_list(status="failed", limit=limit)
seen, queue = set(), []
for j in jobs:
k = j["case_number_norm"]
if k not in seen:
seen.add(k); queue.append(j)
results = []
for i, j in enumerate(queue[:limit]):
if i:
await asyncio.sleep(_INTER_FETCH_COOLDOWN_S)
digest_id = j.get("digest_id")
try:
r = await fetch_and_ingest(j["citation_raw"], digest_id=digest_id)
except Exception as e: # noqa: BLE001 — recorded per-job, never aborts the drain
logger.exception("drain item failed: %s", j["case_number_norm"])
r = {"status": "error", "citation": j["citation_raw"], "error": str(e)}
results.append(r)
done = sum(1 for r in results if r.get("status") in ("done", "already_done"))
return {"processed": len(results), "done": done, "results": results}
async def _record_failure( async def _record_failure(
job_id: UUID, cit: court_citation.CourtCitation, citation: str, err: str job_id: UUID, cit: court_citation.CourtCitation, citation: str, err: str
) -> dict: ) -> dict:

View File

@@ -83,11 +83,40 @@ async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str |
logger.warning("digest try_autolink lookup failed for %r: %s", citation, e) logger.warning("digest try_autolink lookup failed for %r: %s", citation, e)
return None return None
if not match: if not match:
# Gap (INV-DIG3): the underlying ruling isn't in the corpus. If it's a
# court verdict (not ועדת-ערר), enqueue an X13 auto-fetch job so the gap
# is actionable instead of silently dropped (INV-CF2). Never raises.
await _enqueue_court_fetch(digest_id, citation)
return None return None
await db.link_digest_to_case_law(digest_id, match["id"]) await db.link_digest_to_case_law(digest_id, match["id"])
return str(match["id"]) return str(match["id"])
async def _enqueue_court_fetch(digest_id: UUID | str, citation: str) -> None:
"""Queue an X13 court-verdict fetch for an unlinked digest citation.
Court rulings (supreme/admin) → a ``court_fetch_jobs`` row drained later by
``court_fetch_drain``. ועדת-ערר (skip) is left alone — it needs Nevo and is
surfaced through the normal missing-precedent path, not auto-fetch.
"""
try:
from legal_mcp.services import court_citation
cit = court_citation.classify(citation)
if cit.tier not in ("supreme", "admin"):
return
await db.court_fetch_job_upsert(
case_number_norm=cit.case_number_norm,
citation_raw=citation,
tier=cit.tier,
court=cit.court_prefix,
digest_id=UUID(str(digest_id)),
)
logger.info("digest %s: enqueued court-fetch for %r (tier=%s)",
digest_id, citation, cit.tier)
except Exception as e: # never break digest ingest
logger.warning("digest court-fetch enqueue failed for %r: %s", citation, e)
# ── Container-safe creation (web upload) — no LLM, no embedding ────── # ── Container-safe creation (web upload) — no LLM, no embedding ──────
async def create_pending_digest( async def create_pending_digest(

View File

@@ -54,3 +54,13 @@ async def court_fetch_status(case_number: str = "", status_filter: str = "") ->
return _ok({"job": job}) return _ok({"job": job})
jobs = await db.court_fetch_job_list(status=status_filter.strip() or None) jobs = await db.court_fetch_job_list(status=status_filter.strip() or None)
return _ok({"jobs": jobs, "count": len(jobs)}) return _ok({"jobs": jobs, "count": len(jobs)})
async def court_fetch_drain(limit: int = 10) -> str:
"""ריקון תור-האחזור: מוריד וקולט את ה-jobs הממתינים (pending/failed) שהיומונים
מילאו, וקושר כל פסק שנקלט חזרה ליומון-המקור. סדרתי. כלי מקומי בלבד."""
try:
result = await orch.drain_pending(limit=max(1, min(int(limit or 10), 50)))
except Exception as e: # noqa: BLE001
return _err(f"ריקון התור נכשל: {e}")
return _ok(result, message=f"עובדו {result.get('processed', 0)}, נקלטו {result.get('done', 0)}")