"""Host-side HTTP bridge for Tier-1 verdict fetching (X13). Mirrors ``legal_mcp.chat_service.server`` — the proven host-side pattern: an aiohttp app, bound to the docker bridge gateway, Bearer-auth, that does the one thing the container can't (here: drive a real browser against נט המשפט). Endpoints: POST /fetch body {file_number, month, year, case_number, court} → {ok, content_b64, filename, source_url, court, reason} REQUIRES Authorization: Bearer . GET /health liveness (no auth); reports camofox + VNC URL if available. GET /pm2 read-only pm2 status of legal-* / paperclip services (no auth). POST /pm2/control body {name, action: restart|stop|start} → run pm2 on a whitelisted legal-* process. REQUIRES Bearer (mutating). Run with pm2: pm2 start scripts/legal-court-fetch-service.config.cjs Security posture (identical rationale to legal-chat-service): 1. Bind defaults to ``10.0.1.1`` (docker0 bridge gateway) — reachable from the host + containers on docker bridges, invisible to outside networks. 2. ``/fetch`` requires a Bearer token (constant-time compare); the service refuses to start without ``COURT_FETCH_SHARED_SECRET`` set. 3. ``/health`` is unauthenticated and spawns nothing. """ from __future__ import annotations import argparse import base64 import hmac import json import logging import os import sys import time import aiohttp from aiohttp import web _pkg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) if _pkg_root not in sys.path: sys.path.insert(0, _pkg_root) from legal_mcp.court_fetch_service import camofox_client # noqa: E402 from legal_mcp.court_fetch_service import mavat_client # noqa: E402 from legal_mcp.services import usage_limits # noqa: E402 from legal_mcp.services import script_runner # noqa: E402 logger = logging.getLogger("legal_court_fetch_service") _SHARED_SECRET: str = "" async def health(request: web.Request) -> web.Response: info = {"ok": True, "service": "legal-court-fetch-service", "camofox_enabled": camofox_client.is_enabled()} if camofox_client.is_enabled(): try: info["camofox"] = await camofox_client.health() except Exception as e: # health must never throw info["camofox_error"] = str(e) return web.json_response(info) # Background services we surface on the /operations dashboard. pm2 jlist is a # host-only capability (the legal-ai container can't run pm2), so the container's # FastAPI proxies this read-only endpoint over the docker bridge. No secret: # pm2 status (names/cpu/mem) carries nothing sensitive and the bind (10.0.1.1) # is already host/container-only. _PM2_PREFIXES = ("legal-", "paperclip") def _trim_service(a: dict) -> dict: """Project a pm2 jlist app entry into the fields the dashboard needs.""" env = a.get("pm2_env", {}) or {} return { "name": a.get("name", ""), "status": env.get("status", ""), "restarts": env.get("restart_time", 0), "uptime_ms": env.get("pm_uptime", 0), "cpu": (a.get("monit") or {}).get("cpu", 0), "memory_bytes": (a.get("monit") or {}).get("memory", 0), "cron": env.get("cron_restart") or "", "autorestart": env.get("autorestart", True), } async def _pm2_run(*args: str, timeout: float = 10) -> tuple[int, bytes, bytes]: """Run a pm2 subcommand; returns (returncode, stdout, stderr).""" import asyncio as _asyncio proc = await _asyncio.create_subprocess_exec( "pm2", *args, stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, ) out, err = await _asyncio.wait_for(proc.communicate(), timeout=timeout) return proc.returncode or 0, out, err # /operations polls every 5s; the usage endpoint 429s if hit that often (it's # meant for a status bar, not a poll loop). Cache the last good payload and only # re-fetch when older than this — Anthropic sees ~1 req/min regardless of how # many dashboards poll. The 5-hour window moves slowly, so 60s is plenty fresh. _USAGE_TTL_SEC = 60.0 _usage_cache: dict = {"ts": 0.0, "data": None} async def usage_status(request: web.Request) -> web.Response: """Proxy the claude.ai subscription usage % (host-only — needs the local OAuth token), cached for _USAGE_TTL_SEC. On a fetch failure (e.g. the endpoint's own 429) serve the last good payload if we have one, so a transient limit doesn't blank the dashboard. The raw OAuth read is the SHARED single source of truth (legal_mcp.services.usage_limits.subscription_usage) — the SAME reader the halacha drain + supervisor gate on (G1/G2; no triplicated endpoint/creds/UA constants). It's synchronous urllib, so run it in a thread to keep the aiohttp event loop responsive.""" now = time.monotonic() if _usage_cache["data"] is not None and (now - _usage_cache["ts"]) < _USAGE_TTL_SEC: return web.json_response(_usage_cache["data"]) import asyncio as _asyncio # subscription_usage returns None on ANY failure (creds missing / endpoint # 429 / network) — it never throws; serve stale if we have it. data = await _asyncio.get_event_loop().run_in_executor( None, usage_limits.subscription_usage) if data is None: if _usage_cache["data"] is not None: return web.json_response(_usage_cache["data"]) return web.json_response({"error": "usage unavailable"}, status=502) _usage_cache["ts"] = now _usage_cache["data"] = data return web.json_response(data) async def pm2_status(request: web.Request) -> web.Response: """Return a trimmed ``pm2 jlist`` for the legal-ai background services.""" try: rc, out, err = await _pm2_run("jlist") if rc != 0: return web.json_response( {"error": f"pm2 jlist failed: {err.decode('utf-8','replace')[:200]}"}, status=502, ) apps = json.loads(out.decode("utf-8", "replace")) except FileNotFoundError: return web.json_response({"error": "pm2 not found on PATH"}, status=502) except Exception as e: # never throw return web.json_response({"error": f"pm2 error: {e}"}, status=502) services = [ _trim_service(a) for a in apps if any(str(a.get("name", "")).startswith(p) for p in _PM2_PREFIXES) ] services.sort(key=lambda s: s["name"]) return web.json_response({"services": services}) # Process control (restart/stop/start) for the dashboard's "Windows-services" # panel. Mutating, so it requires the Bearer secret (unlike read-only /pm2). # Whitelisted to ``legal-`` names only — never paperclip or arbitrary processes. _PM2_ACTIONS = {"restart", "stop", "start"} # Our own pm2 process name. Restarting/stopping ourselves kills this process # mid-reply, so those self-actions are detached (see pm2_control). _OWN_PM2_NAME = os.environ.get("COURT_FETCH_SERVICE_PM2_NAME", "legal-court-fetch-service") async def pm2_control(request: web.Request) -> web.Response: """Run ``pm2 `` for a whitelisted legal-* process.""" unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) name = str(body.get("name", "")).strip() action = str(body.get("action", "")).strip() if action not in _PM2_ACTIONS: return web.json_response( {"error": f"action must be one of {sorted(_PM2_ACTIONS)}"}, status=400 ) if not name.startswith("legal-"): return web.json_response( {"error": "name must be a legal-* process"}, status=403 ) # Self restart/stop kills this process before it can reply (client sees a # dropped connection / 502) even though pm2 does perform the action. Detach # it with a brief delay so the HTTP response flushes first, then report # success optimistically. if name == _OWN_PM2_NAME and action in ("restart", "stop"): import asyncio as _asyncio await _asyncio.create_subprocess_shell(f"sleep 1; pm2 {action} {name} --silent") return web.json_response( {"ok": True, "action": action, "deferred": True, "service": None} ) try: rc, out, err = await _pm2_run(action, name, "--silent", timeout=30) if rc != 0: return web.json_response( {"ok": False, "error": f"pm2 {action} {name} failed: " f"{err.decode('utf-8','replace')[:200]}"}, status=502, ) # Re-read just this process so the UI settles on the real new state. rc2, out2, _ = await _pm2_run("jlist") svc = None if rc2 == 0: for a in json.loads(out2.decode("utf-8", "replace")): if a.get("name") == name: svc = _trim_service(a) break return web.json_response({"ok": True, "action": action, "service": svc}) except FileNotFoundError: return web.json_response({"error": "pm2 not found on PATH"}, status=502) except Exception as e: # never throw return web.json_response({"ok": False, "error": f"pm2 error: {e}"}, status=502) def _check_bearer(request: web.Request) -> web.Response | None: auth = request.headers.get("Authorization", "") expected = "Bearer " + _SHARED_SECRET if not auth or not hmac.compare_digest(auth, expected): return web.json_response( {"error": "unauthorized: missing or invalid Bearer token"}, status=401 ) return None async def fetch(request: web.Request) -> web.Response: unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) required = ("file_number", "month", "year") if not all(body.get(k) for k in required): return web.json_response( {"ok": False, "reason": f"missing one of {required}"}, status=400 ) try: result = await camofox_client.fetch_admin_verdict( file_number=str(body["file_number"]), month=str(body["month"]), year=str(body["year"]), case_number=str(body.get("case_number", "")), court=str(body.get("court", "")), ) return web.json_response({ "ok": True, "content_b64": base64.b64encode(result["content"]).decode("ascii"), "filename": result.get("filename", ""), "source_url": result.get("source_url", ""), "court": result.get("court", ""), }) except (camofox_client.CamofoxUnavailable, camofox_client.NgcsFlowError) as e: # Expected, recoverable failure → orchestrator escalates (INV-CF3). return web.json_response({"ok": False, "reason": str(e)}, status=200) except Exception as e: # noqa: BLE001 logger.exception("fetch failed") return web.json_response({"ok": False, "reason": f"unexpected: {e}"}, status=200) async def plan_fetch(request: web.Request) -> web.Response: """Fetch one תב"ע's identity + validity from mavat (מנהל התכנון). Body ``{plan_number}`` → ``{ok, plan: {...}, reason}``. Same Bearer + bind as /fetch. The browser work (Camoufox over Xvfb past F5 ASM) lives in ``mavat_client``; expected failures (not found / blocked) come back ok=false at HTTP 200 so the caller renders a reason rather than treating it as a 5xx. """ unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) plan_number = str(body.get("plan_number", "")).strip() if not plan_number: return web.json_response({"ok": False, "reason": "missing plan_number"}, status=400) try: plan = await mavat_client.fetch_plan(plan_number) return web.json_response({"ok": True, "plan": plan}) except (mavat_client.MavatUnavailable, mavat_client.MavatFlowError) as e: # Expected, recoverable (browser unavailable / plan not found / blocked). return web.json_response({"ok": False, "reason": str(e)}, status=200) except Exception as e: # noqa: BLE001 logger.exception("plan_fetch failed") return web.json_response({"ok": False, "reason": f"unexpected: {e}"}, status=200) # ─── adapter-migration: host-side runner for scripts/migrate_agent_adapter.py ─── # The legal-ai container can't perform the migration itself (it needs the host # filesystem — generated instruction copies, the gemini settings file — plus the # embedded board DB), so the dashboard proxies the action here. Mutating, so it # requires the Bearer secret like /pm2/control. We launch exactly one fixed, # in-repo script with create_subprocess_exec (no shell) and an action allowlist; # every other argument is passed through opaque and validated by the script # itself. Kept deliberately symbol-light so this host bridge stays generic. _MIGRATE_SCRIPT = "/home/chaim/legal-ai/scripts/migrate_agent_adapter.py" _MIGRATE_PYTHON = "/home/chaim/legal-ai/mcp-server/.venv/bin/python" _MIGRATE_ACTIONS = {"check", "apply", "revert", "verify"} async def adapter_migration(request: web.Request) -> web.Response: """Run scripts/migrate_agent_adapter.py on the host and relay its result.""" unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) action = str(body.get("action", "")).strip() if action not in _MIGRATE_ACTIONS: return web.json_response( {"error": f"action must be one of {sorted(_MIGRATE_ACTIONS)}"}, status=400 ) argv = [_MIGRATE_PYTHON, _MIGRATE_SCRIPT, f"--{action}"] agent = str(body.get("agent", "")).strip() target = str(body.get("to", "")).strip() model = str(body.get("model", "")).strip() if action in ("check", "apply", "revert"): if not agent: return web.json_response({"error": "agent required"}, status=400) argv += ["--agent", agent] if action in ("check", "apply"): if not target: return web.json_response({"error": "to (target) required"}, status=400) argv += ["--to", target] if model: argv += ["--model", model] if bool(body.get("relax_tools")): argv += ["--relax-tools"] import asyncio as _asyncio env = {**os.environ, "HOME": "/home/chaim"} try: proc = await _asyncio.create_subprocess_exec( *argv, cwd="/home/chaim/legal-ai", env=env, stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, ) out, err = await _asyncio.wait_for(proc.communicate(), timeout=180) except _asyncio.TimeoutError: return web.json_response({"ok": False, "error": "migration timed out"}, status=504) except Exception as e: # never throw — relay the failure return web.json_response({"ok": False, "error": f"launch failed: {e}"}, status=502) # 200 regardless of exit code: a non-zero --check (preflight refusal) is an # informative result the caller renders, not a transport error. return web.json_response({ "ok": (proc.returncode == 0), "exit_code": proc.returncode, "stdout": out.decode("utf-8", "replace"), "stderr": err.decode("utf-8", "replace"), }) # ─── run-script: host-side runner for read-only/audit scripts (#4) ───────────── # Same shape as /adapter-migration but for the SCRIPT_RUN_ALLOWLIST — a fixed set # of read-only scripts each with a hard-coded safe argv. The request body's only # meaningful field is ``name``; arguments are NEVER taken from the caller (so no # --apply/--force injection). Allowlist enforcement lives here, on the host. async def run_script(request: web.Request) -> web.Response: """Run an allowlisted read-only script on the host and relay its result.""" unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) name = str(body.get("name", "")).strip() argv = script_runner.build_argv(name) if argv is None: return web.json_response( {"ok": False, "error": f"script not runnable (not in allowlist): {name!r}"}, status=403, ) import asyncio as _asyncio env = {**os.environ, "HOME": "/home/chaim"} try: proc = await _asyncio.create_subprocess_exec( *argv, cwd="/home/chaim/legal-ai", env=env, stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, ) out, err = await _asyncio.wait_for(proc.communicate(), timeout=600) except _asyncio.TimeoutError: return web.json_response({"ok": False, "error": "script timed out"}, status=504) except Exception as e: # never throw — relay the failure return web.json_response({"ok": False, "error": f"launch failed: {e}"}, status=502) # best-effort audit trail — one line per run try: os.makedirs("/home/chaim/legal-ai/data/logs", exist_ok=True) stamp = time.strftime("%Y-%m-%dT%H:%M:%S%z") with open("/home/chaim/legal-ai/data/logs/script-runs.log", "a") as fh: fh.write(f"{stamp}\t{name}\texit={proc.returncode}\n") except Exception: pass # 200 regardless of exit code — a non-zero audit result is informative output # the caller renders, not a transport error. return web.json_response({ "ok": (proc.returncode == 0), "exit_code": proc.returncode, "stdout": out.decode("utf-8", "replace"), "stderr": err.decode("utf-8", "replace"), }) def build_app() -> web.Application: app = web.Application(client_max_size=64 * 1024 * 1024) app.router.add_get("/health", health) app.router.add_get("/pm2", pm2_status) app.router.add_get("/usage", usage_status) app.router.add_post("/pm2/control", pm2_control) app.router.add_post("/fetch", fetch) app.router.add_post("/plan-fetch", plan_fetch) app.router.add_post("/adapter-migration", adapter_migration) app.router.add_post("/run-script", run_script) return app def main() -> int: parser = argparse.ArgumentParser(description="legal-court-fetch-service") parser.add_argument("--port", type=int, default=8771) parser.add_argument("--host", default="10.0.1.1", help="bind address; default = docker0 bridge gateway") parser.add_argument("--log-level", default="INFO") args = parser.parse_args() logging.basicConfig(level=args.log_level.upper(), format="%(asctime)s %(name)s %(levelname)s %(message)s") secret = os.environ.get("COURT_FETCH_SHARED_SECRET", "").strip() if not secret: logger.error( "COURT_FETCH_SHARED_SECRET is empty; refusing to start. Set it in " "/home/chaim/.legal-court-fetch-service.env (loaded by pm2) and " "mirror it as a Coolify env var on the legal-ai app." ) return 2 if len(secret) < 24: logger.error("COURT_FETCH_SHARED_SECRET too short (>=32 chars expected).") return 2 global _SHARED_SECRET _SHARED_SECRET = secret app = build_app() logger.info("legal-court-fetch-service listening on %s:%d", args.host, args.port) web.run_app(app, host=args.host, port=args.port, print=lambda _m: None) return 0 if __name__ == "__main__": sys.exit(main())