From f56309da5a6f9ef7dc827debb4bb85554c1dd8fa Mon Sep 17 00:00:00 2001 From: Chaim Date: Sun, 7 Jun 2026 20:04:12 +0000 Subject: [PATCH] feat(X13): auto-trigger court fetch from digests + drain tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit סוגר את הלולאה — יומון שמצביע על פס"ד בית-משפט שלא בקורפוס מזניק אחזור אוטומטי, וקושר את היומון חזרה אחרי הקליטה (INV-DIG3 + INV-CF2). - digest_library.try_autolink: בכשל-קישור, אם הציטוט מסווג כפס"ד-בימ"ש (supreme/admin) → _enqueue_court_fetch יוצר court_fetch_jobs(pending); ועדת-ערר (skip) לא מוזנק. never-raises (לא שובר קליטת-יומון). - orchestrator.drain_pending(limit): מנקז pending/failed סדרתי (cooldown, INV-CF4), fetch+ingest לכל אחד; בהצלחה מקשר את היומון ל-case_law שנקלט. - כלי-MCP court_fetch_drain + רישום ב-server.py. - X13 spec: עודכן (הפער ב-INV-CF2 סומן כמתוקן). נבדק מול ה-DB: עת"מ 46111-12-22 → job tier=admin pending digest-linked; ערר 1110/20 → לא מוזנק. כלי מקומי בלבד (ingest = claude CLI). Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/spec/X13-court-fetch.md | 6 +-- mcp-server/src/legal_mcp/server.py | 6 +++ .../services/court_fetch_orchestrator.py | 46 +++++++++++++++++++ .../src/legal_mcp/services/digest_library.py | 29 ++++++++++++ mcp-server/src/legal_mcp/tools/court_fetch.py | 10 ++++ 5 files changed, 94 insertions(+), 3 deletions(-) diff --git a/docs/spec/X13-court-fetch.md b/docs/spec/X13-court-fetch.md index edd6483..c6f9a96 100644 --- a/docs/spec/X13-court-fetch.md +++ b/docs/spec/X13-court-fetch.md @@ -72,7 +72,7 @@ underlying_citation → [classifier] → tier ∈ {supreme, admin, skip} לא נזרק בשקט. `except: pass` אסור. **מקור-סמכות:** פרויקטלי-תפעולי — מיישם את [G4](00-constitution.md#inv-g4) וכלל-ההנדסה "אין בליעה שקטה" (§6). **אכיפה:** טבלת `court_fetch_jobs` (status+error+attempts) + לוג-warning בכל כישלון + Tier-2 gate. -**הפרה ידועה:** הפער הקיים ב-X12 — `try_autolink` שנכשל מחזיר `None` בשקט (יתוקן ע"י טריגר זה). +**הפרה ידועה:** ~~הפער ב-X12 — `try_autolink` שנכשל מחזיר `None` בשקט~~ → **תוקן**: `try_autolink` שנכשל על ציטוט פס"ד-בימ"ש מזניק job ל-`court_fetch_jobs` (status=pending); `court_fetch_drain` מנקז (סדרתי) ומקשר את היומון חזרה בהצלחה. ### INV-CF3: אוטונומי-first, שער-אנושי חובה ב-fallback **כלל:** האחזור מנסה אוטונומית; אך כש-N נסיונות נכשלים, **שער-אנושי** (VNC לפתרון-CAPTCHA @@ -151,8 +151,8 @@ Service / responsible automation) | סטטוס: verified | proxy בקונטיינר | `web/court_fetch_proxy.py` | שכפול `web/chat_proxy.py` | | pm2 | `scripts/legal-court-fetch-service.config.cjs` | שכפול `legal-chat-service.config.cjs` | | אורקסטרטור+תור | `services/court_fetch_orchestrator.py` + `db.py` (SCHEMA_Vxx) | דפוס-תור קיים | -| כלי-MCP | `tools/court_fetch.py` (`court_verdict_fetch`) | חוזה-envelope [X9](X9-mcp-tool-contract.md) | -| טריגר | `services/digest_library.py` (`try_autolink` fail-path) | X12 | +| כלי-MCP | `tools/court_fetch.py` (`court_verdict_fetch` / `court_fetch_status` / `court_fetch_drain`) | חוזה-envelope [X9](X9-mcp-tool-contract.md) | +| טריגר אוטומטי | `services/digest_library.py` (`try_autolink` fail → `_enqueue_court_fetch`) → drain ע"י `orchestrator.drain_pending` | X12 | | סוד | `COURT_FETCH_SHARED_SECRET` (Infisical + Coolify) | דפוס `LEGAL_CHAT_SHARED_SECRET`, [X10](X10-deploy-env-secrets.md) | --- diff --git a/mcp-server/src/legal_mcp/server.py b/mcp-server/src/legal_mcp/server.py index 07b3233..938dbbb 100644 --- a/mcp-server/src/legal_mcp/server.py +++ b/mcp-server/src/legal_mcp/server.py @@ -988,6 +988,12 @@ async def court_fetch_status(case_number: str = "", status_filter: str = "") -> return await cf_tools.court_fetch_status(case_number, status_filter) +@mcp.tool() +async def court_fetch_drain(limit: int = 10) -> str: + """ריקון תור-אחזור הפסיקה — מוריד וקולט jobs ממתינים שהיומונים מילאו, וקושר חזרה ליומון. מקומי בלבד.""" + return await cf_tools.court_fetch_drain(limit) + + # ── Internal citations graph (TaskMaster #34) ───────────────────── diff --git a/mcp-server/src/legal_mcp/services/court_fetch_orchestrator.py b/mcp-server/src/legal_mcp/services/court_fetch_orchestrator.py index d7a1cea..833f03e 100644 --- a/mcp-server/src/legal_mcp/services/court_fetch_orchestrator.py +++ b/mcp-server/src/legal_mcp/services/court_fetch_orchestrator.py @@ -204,10 +204,56 @@ async def fetch_and_ingest( case_law_id=UUID(str(case_law_id)) if case_law_id else None, source_url=source_url, error="", ) + # Close the digest gap (INV-DIG3): if this fetch traces back to a digest, + # link it to the freshly-ingested ruling. Best-effort; never fails the job. + link_digest_id = digest_id or job.get("digest_id") + if case_law_id and link_digest_id: + try: + await db.link_digest_to_case_law(link_digest_id, UUID(str(case_law_id))) + logger.info("linked digest %s → case_law %s", link_digest_id, case_law_id) + except Exception: + logger.warning("could not relink digest %s after fetch", link_digest_id) return {"status": "done", "tier": cit.tier, "case_law_id": case_law_id, "citation": citation, "source_url": source_url, "ingest": result} +# Politeness between consecutive court fetches in a drain (INV-CF4) — serial, +# spaced. Mirrors the precedent-extraction queue cadence. +_INTER_FETCH_COOLDOWN_S = float(os.environ.get("COURT_FETCH_DRAIN_COOLDOWN_S", "20")) + + +async def drain_pending(limit: int = 10) -> dict: + """Process queued court-fetch jobs (status pending/failed) serially. + + Drains the ``court_fetch_jobs`` queue the digest trigger fills — fetch + + ingest each, link back to its digest. Serial with a cooldown (INV-CF4); a + job that fails is recorded and retried next drain until it escalates to + ``manual`` (INV-CF3). Local-only (runs the ingest pipeline / claude CLI). + """ + import asyncio + + jobs = await db.court_fetch_job_list(status="pending", limit=limit) + jobs += await db.court_fetch_job_list(status="failed", limit=limit) + seen, queue = set(), [] + for j in jobs: + k = j["case_number_norm"] + if k not in seen: + seen.add(k); queue.append(j) + results = [] + for i, j in enumerate(queue[:limit]): + if i: + await asyncio.sleep(_INTER_FETCH_COOLDOWN_S) + digest_id = j.get("digest_id") + try: + r = await fetch_and_ingest(j["citation_raw"], digest_id=digest_id) + except Exception as e: # noqa: BLE001 — recorded per-job, never aborts the drain + logger.exception("drain item failed: %s", j["case_number_norm"]) + r = {"status": "error", "citation": j["citation_raw"], "error": str(e)} + results.append(r) + done = sum(1 for r in results if r.get("status") in ("done", "already_done")) + return {"processed": len(results), "done": done, "results": results} + + async def _record_failure( job_id: UUID, cit: court_citation.CourtCitation, citation: str, err: str ) -> dict: diff --git a/mcp-server/src/legal_mcp/services/digest_library.py b/mcp-server/src/legal_mcp/services/digest_library.py index 6f37799..92ef89e 100644 --- a/mcp-server/src/legal_mcp/services/digest_library.py +++ b/mcp-server/src/legal_mcp/services/digest_library.py @@ -83,11 +83,40 @@ async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str | logger.warning("digest try_autolink lookup failed for %r: %s", citation, e) return None if not match: + # Gap (INV-DIG3): the underlying ruling isn't in the corpus. If it's a + # court verdict (not ועדת-ערר), enqueue an X13 auto-fetch job so the gap + # is actionable instead of silently dropped (INV-CF2). Never raises. + await _enqueue_court_fetch(digest_id, citation) return None await db.link_digest_to_case_law(digest_id, match["id"]) return str(match["id"]) +async def _enqueue_court_fetch(digest_id: UUID | str, citation: str) -> None: + """Queue an X13 court-verdict fetch for an unlinked digest citation. + + Court rulings (supreme/admin) → a ``court_fetch_jobs`` row drained later by + ``court_fetch_drain``. ועדת-ערר (skip) is left alone — it needs Nevo and is + surfaced through the normal missing-precedent path, not auto-fetch. + """ + try: + from legal_mcp.services import court_citation + cit = court_citation.classify(citation) + if cit.tier not in ("supreme", "admin"): + return + await db.court_fetch_job_upsert( + case_number_norm=cit.case_number_norm, + citation_raw=citation, + tier=cit.tier, + court=cit.court_prefix, + digest_id=UUID(str(digest_id)), + ) + logger.info("digest %s: enqueued court-fetch for %r (tier=%s)", + digest_id, citation, cit.tier) + except Exception as e: # never break digest ingest + logger.warning("digest court-fetch enqueue failed for %r: %s", citation, e) + + # ── Container-safe creation (web upload) — no LLM, no embedding ────── async def create_pending_digest( diff --git a/mcp-server/src/legal_mcp/tools/court_fetch.py b/mcp-server/src/legal_mcp/tools/court_fetch.py index e4c44b2..a8745f6 100644 --- a/mcp-server/src/legal_mcp/tools/court_fetch.py +++ b/mcp-server/src/legal_mcp/tools/court_fetch.py @@ -54,3 +54,13 @@ async def court_fetch_status(case_number: str = "", status_filter: str = "") -> return _ok({"job": job}) jobs = await db.court_fetch_job_list(status=status_filter.strip() or None) return _ok({"jobs": jobs, "count": len(jobs)}) + + +async def court_fetch_drain(limit: int = 10) -> str: + """ריקון תור-האחזור: מוריד וקולט את ה-jobs הממתינים (pending/failed) שהיומונים + מילאו, וקושר כל פסק שנקלט חזרה ליומון-המקור. סדרתי. כלי מקומי בלבד.""" + try: + result = await orch.drain_pending(limit=max(1, min(int(limit or 10), 50))) + except Exception as e: # noqa: BLE001 + return _err(f"ריקון התור נכשל: {e}") + return _ok(result, message=f"עובדו {result.get('processed', 0)}, נקלטו {result.get('done', 0)}") -- 2.49.1