"""Camoufox driver for mavat (מנהל התכנון) — pull תב"ע identity + validity. mavat sits behind an F5 BIG-IP ASM bot-wall: a scripted curl/httpx gets a 302→maintenance, but a real JS-executing browser on this server clears the challenge (verified 2026-06-17). So, like X13's נט-המשפט flow, we drive a **Camoufox** stealth browser over Xvfb — same engine, same host service, no second port/secret (G2). The proven flow (validated end-to-end on 101-1031020 → י"פ 13697 and 101-1053933 → י"פ 13836, two stable runs): 1. goto the SPA home; it redirects to ``/SV1`` once the F5 JS challenge resolves (TS* cookies set) — that is the normal landed state. 2. Type the plan number into ``#sv3-search__input`` (the only visible text input) and press Enter. The SPA POSTs ``/rest/api/sv3/Search`` with a reCAPTCHA token it supplies transparently — so reCAPTCHA must stay enabled (blocking it kills the token and results never render). For a unique plan number the SPA then **auto-navigates** to ``/SV4/1//310``. 3. That navigation fires ``GET /rest/api/SV4/1?mid=&guid=0`` (~55 KB JSON). It returns 200 only in the in-app navigation context, so we capture it off the SPA's own request (a standalone replay 404s). 4. Parse identity from ``planDetails`` and validity from ``rsInternet``: the row ``LIS_DESC == "פרסום לאישור ברשומות"`` carries ``ED_PUBLICATION_FILE`` (= yalkut number) and a ``DETAILS`` string with date + page. The separate "פרסום להפקדה ברשומות" row is the deposit (ignored). Driver-crash workaround (required): the SV4 navigation throws an uncaught SPA error that crashes the playwright-firefox driver (it reads ``pageError.location.url``). An init-script swallowing ``window.onerror`` + ``error``/``unhandledrejection`` (preventDefault) keeps the driver alive. INV-AH: ``source_url`` is the mavat plan page; a field mavat doesn't expose comes back empty, never guessed. This module only returns a candidate — the chair gates it (review_status) before block-ט cites it. Operational requirements (shared with camofox_client): a virtual display (``DISPLAY``=:99 via Xvfb) and ~0.5–1 GB RAM for the Firefox content process. """ from __future__ import annotations import asyncio import logging import os import re # Reuse the X13 orphan-browser reaper (same camoufox-bin binary) — G2, no copy. from legal_mcp.court_fetch_service.camofox_client import _reap_orphan_browsers logger = logging.getLogger(__name__) MAVAT_HOME = "https://mavat.iplan.gov.il/" _SV4_RESP_RE = re.compile(r"/rest/api/SV4/1\?mid=", re.IGNORECASE) _DISPLAY = os.environ.get("DISPLAY", "") _NAV_TIMEOUT_MS = int(float(os.environ.get("PLAN_FETCH_BROWSER_TIMEOUT_S", "60")) * 1000) _FETCH_HARD_TIMEOUT_S = float(os.environ.get("PLAN_FETCH_HARD_TIMEOUT_S", "180")) # Proven waits (both verification runs passed; the search box is absent before # the F5 + Angular boot, and the SV4 XHR lands a few seconds after Enter). _HOME_WAIT_MS = 8000 _SEARCH_WAIT_MS = 9000 _SV4_POLL_TRIES = 8 _SV4_POLL_MS = 4000 _SEARCH_INPUT = "#sv3-search__input" # The gazette/yalkut status row vs the (ignored) deposit row. _GAZETTE_LIS_DESC = "פרסום לאישור ברשומות" # Swallow the SPA's uncaught SV4 error so the playwright-firefox driver survives. _CRASH_GUARD_JS = """ window.addEventListener('error', function (e) { try { e.preventDefault(); } catch (x) {} }, true); window.addEventListener('unhandledrejection', function (e) { try { e.preventDefault(); } catch (x) {} }, true); window.onerror = function () { return true; }; """ _DATE_RE = re.compile(r"תאריך\s*פרסום\s*:?\s*(\d{1,2})/(\d{1,2})/(\d{4})") _PAGE_RE = re.compile(r"עמוד\s*:?\s*(\d{1,6})") _YALKUT_DETAILS_RE = re.compile(r"ילקוט\s*פרסומים\s*:?\s*(\d{2,6})") class MavatUnavailable(RuntimeError): """Camoufox / its virtual display isn't available.""" class MavatFlowError(RuntimeError): """A step in the mavat flow failed (blocked / not found / not parsed).""" def is_enabled() -> bool: try: import camoufox.async_api # noqa: F401 return True except Exception: return False async def health() -> dict: return {"camoufox_import": is_enabled(), "display": _DISPLAY or "(none)"} # ─── payload parsing ────────────────────────────────────────────────────────── def _s(v) -> str: return v.strip() if isinstance(v, str) else "" def _yalkut_str(v) -> str: """ED_PUBLICATION_FILE comes as a float (13697.0) — render as a clean int.""" if isinstance(v, (int, float)): return str(int(v)) s = _s(v) m = re.search(r"\d{2,6}", s) return m.group(0) if m else "" def _parse_sv4(sv4: dict, plan_number: str, source_url: str) -> dict: """Map an SV4 plan-detail JSON object to our registry-candidate fields. Identity lives in ``planDetails``; validity in the top-level ``rsInternet``. """ pd = sv4.get("planDetails") if isinstance(sv4, dict) else None pd = pd if isinstance(pd, dict) else {} display_name = _s(pd.get("E_NAME")) auth = _s(pd.get("AUTH")) subtype = _s(pd.get("ENTITY_SUBTYPE")) plan_type = f"{auth} ({subtype})" if auth and subtype else (auth or subtype) purpose = _s(pd.get("GOALS")) gazette_date, yalkut_number, yalkut_page = "", "", "" rows = sv4.get("rsInternet") if isinstance(sv4, dict) else None rows = rows if isinstance(rows, list) else [] for row in rows: if not isinstance(row, dict) or _s(row.get("LIS_DESC")) != _GAZETTE_LIS_DESC: continue yalkut_number = _yalkut_str(row.get("ED_PUBLICATION_FILE")) details = _s(row.get("DETAILS")) md = _DATE_RE.search(details) if md: d, mo, y = md.groups() gazette_date = f"{int(y):04d}-{int(mo):02d}-{int(d):02d}" if not gazette_date: # fall back to the structured row date (EIS_DATE: ISO-ish or dd/mm/yyyy) ed = _s(row.get("EIS_DATE")) m2 = re.search(r"(\d{4})-(\d{2})-(\d{2})", ed) or re.search( r"(\d{1,2})/(\d{1,2})/(\d{4})", ed) if m2 and "-" in ed: gazette_date = m2.group(0)[:10] elif m2: d, mo, y = m2.groups() gazette_date = f"{int(y):04d}-{int(mo):02d}-{int(d):02d}" if not yalkut_number: my = _YALKUT_DETAILS_RE.search(details) if my: yalkut_number = my.group(1) mp = _PAGE_RE.search(details) if mp: yalkut_page = mp.group(1) break return { "plan_number": _s(pd.get("NUMB")) or plan_number, "display_name": display_name, "plan_type": plan_type, "purpose": purpose, "gazette_date": gazette_date, "yalkut_number": yalkut_number, "yalkut_page": yalkut_page, "source_url": source_url, } # ─── driver ─────────────────────────────────────────────────────────────────── async def fetch_plan(plan_number: str) -> dict: """Drive mavat for one plan; return the registry-candidate dict. Raises ``MavatUnavailable`` (no browser/display) or ``MavatFlowError`` (blocked / not found / not parsed). """ plan_number = (plan_number or "").strip() if not plan_number: raise MavatFlowError("חסר מספר-תכנית") try: from camoufox.async_api import AsyncCamoufox except Exception as e: raise MavatUnavailable( "חבילת camoufox אינה מותקנת/זמינה. ראה docs/spec/X13-court-fetch.md." ) from e if not _DISPLAY: raise MavatUnavailable( "אין DISPLAY — Camoufox דורש Xvfb על שרת ללא מסך (למשל :99)." ) _reap_orphan_browsers() async def _run() -> dict: captured: dict = {"sv4": None, "sv4_url": ""} async def on_resp(resp): if captured["sv4"] is not None or not _SV4_RESP_RE.search(resp.url): return try: captured["sv4"] = await resp.json() captured["sv4_url"] = resp.url except Exception: # a racing/non-JSON response must not kill the flow pass async with AsyncCamoufox( headless=True, geoip=False, humanize=True, locale="he-IL" ) as browser: page = await browser.new_page() await page.add_init_script(_CRASH_GUARD_JS) page.context.on("response", lambda r: asyncio.create_task(on_resp(r))) # 1) home → let F5 ASM resolve (lands on /SV1; search box appears). await page.goto(MAVAT_HOME, wait_until="domcontentloaded", timeout=_NAV_TIMEOUT_MS) await page.wait_for_timeout(_HOME_WAIT_MS) # 2) type the plan number + Enter → sv3/Search → SPA auto-navigates to SV4. box = page.locator(_SEARCH_INPUT) try: await box.wait_for(state="visible", timeout=_NAV_TIMEOUT_MS) await box.fill(plan_number) await box.press("Enter") except Exception as e: raise MavatFlowError(f"שדה-החיפוש ({_SEARCH_INPUT}) לא נמצא/לא נגיש: {e}") await page.wait_for_timeout(_SEARCH_WAIT_MS) # 3) the SV4 GET is captured by on_resp; poll until it lands. for _ in range(_SV4_POLL_TRIES): if captured["sv4"] is not None: break await page.wait_for_timeout(_SV4_POLL_MS) sv4 = captured["sv4"] if sv4 is None: raise MavatFlowError( "לא נלכד SV4 מ-mavat — ייתכן שהתכנית לא נמצאה, ריבוי-תוצאות, או חסימת-F5." ) parsed = _parse_sv4(sv4, plan_number, captured["sv4_url"] or MAVAT_HOME) if not parsed["display_name"]: raise MavatFlowError("SV4 נלכד אך ללא שם-תכנית (planDetails.E_NAME) — פענוח נכשל.") logger.info( "mavat: fetched %s — name=%r gazette=%s yalkut=%s", plan_number, parsed["display_name"], parsed["gazette_date"], parsed["yalkut_number"], ) return parsed try: return await asyncio.wait_for(_run(), _FETCH_HARD_TIMEOUT_S) except asyncio.TimeoutError: _reap_orphan_browsers() raise MavatFlowError( f"משיכת-התכנית עברה את מגבלת-הזמן ({_FETCH_HARD_TIMEOUT_S:.0f}ש') ובוטלה" ) finally: _reap_orphan_browsers()