Files
legal-ai/mcp-server/src/legal_mcp/court_fetch_service/server.py
Chaim d23f854c25 fix(ops): self-restart/stop of the host bridge returns 200 (detached)
Restarting/stopping legal-court-fetch-service from its own /pm2/control kills
the process before it can reply — the client got a misleading 502 even though
pm2 performed the restart. Detach the self-action (sleep 1; pm2 ...) so the HTTP
response flushes first, and report success optimistically. Other targets are
unchanged. Own name via COURT_FETCH_SERVICE_PM2_NAME (default legal-court-fetch-service).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 09:09:08 +00:00

276 lines
11 KiB
Python

"""Host-side HTTP bridge for Tier-1 verdict fetching (X13).
Mirrors ``legal_mcp.chat_service.server`` — the proven host-side pattern: an
aiohttp app, bound to the docker bridge gateway, Bearer-auth, that does the one
thing the container can't (here: drive a real browser against נט המשפט).
Endpoints:
POST /fetch body {file_number, month, year, case_number, court}
{ok, content_b64, filename, source_url, court, reason}
REQUIRES Authorization: Bearer <COURT_FETCH_SHARED_SECRET>.
GET /health liveness (no auth); reports camofox + VNC URL if available.
GET /pm2 read-only pm2 status of legal-* / paperclip services (no auth).
POST /pm2/control body {name, action: restart|stop|start} → run pm2 on a
whitelisted legal-* process. REQUIRES Bearer (mutating).
Run with pm2:
pm2 start scripts/legal-court-fetch-service.config.cjs
Security posture (identical rationale to legal-chat-service):
1. Bind defaults to ``10.0.1.1`` (docker0 bridge gateway) — reachable from
the host + containers on docker bridges, invisible to outside networks.
2. ``/fetch`` requires a Bearer token (constant-time compare); the service
refuses to start without ``COURT_FETCH_SHARED_SECRET`` set.
3. ``/health`` is unauthenticated and spawns nothing.
"""
from __future__ import annotations
import argparse
import base64
import hmac
import json
import logging
import os
import sys
from aiohttp import web
_pkg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
if _pkg_root not in sys.path:
sys.path.insert(0, _pkg_root)
from legal_mcp.court_fetch_service import camofox_client # noqa: E402
logger = logging.getLogger("legal_court_fetch_service")
_SHARED_SECRET: str = ""
async def health(request: web.Request) -> web.Response:
info = {"ok": True, "service": "legal-court-fetch-service",
"camofox_enabled": camofox_client.is_enabled()}
if camofox_client.is_enabled():
try:
info["camofox"] = await camofox_client.health()
except Exception as e: # health must never throw
info["camofox_error"] = str(e)
return web.json_response(info)
# Background services we surface on the /operations dashboard. pm2 jlist is a
# host-only capability (the legal-ai container can't run pm2), so the container's
# FastAPI proxies this read-only endpoint over the docker bridge. No secret:
# pm2 status (names/cpu/mem) carries nothing sensitive and the bind (10.0.1.1)
# is already host/container-only.
_PM2_PREFIXES = ("legal-", "paperclip")
def _trim_service(a: dict) -> dict:
"""Project a pm2 jlist app entry into the fields the dashboard needs."""
env = a.get("pm2_env", {}) or {}
return {
"name": a.get("name", ""),
"status": env.get("status", ""),
"restarts": env.get("restart_time", 0),
"uptime_ms": env.get("pm_uptime", 0),
"cpu": (a.get("monit") or {}).get("cpu", 0),
"memory_bytes": (a.get("monit") or {}).get("memory", 0),
"cron": env.get("cron_restart") or "",
"autorestart": env.get("autorestart", True),
}
async def _pm2_run(*args: str, timeout: float = 10) -> tuple[int, bytes, bytes]:
"""Run a pm2 subcommand; returns (returncode, stdout, stderr)."""
import asyncio as _asyncio
proc = await _asyncio.create_subprocess_exec(
"pm2", *args,
stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE,
)
out, err = await _asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode or 0, out, err
async def pm2_status(request: web.Request) -> web.Response:
"""Return a trimmed ``pm2 jlist`` for the legal-ai background services."""
try:
rc, out, err = await _pm2_run("jlist")
if rc != 0:
return web.json_response(
{"error": f"pm2 jlist failed: {err.decode('utf-8','replace')[:200]}"},
status=502,
)
apps = json.loads(out.decode("utf-8", "replace"))
except FileNotFoundError:
return web.json_response({"error": "pm2 not found on PATH"}, status=502)
except Exception as e: # never throw
return web.json_response({"error": f"pm2 error: {e}"}, status=502)
services = [
_trim_service(a) for a in apps
if any(str(a.get("name", "")).startswith(p) for p in _PM2_PREFIXES)
]
services.sort(key=lambda s: s["name"])
return web.json_response({"services": services})
# Process control (restart/stop/start) for the dashboard's "Windows-services"
# panel. Mutating, so it requires the Bearer secret (unlike read-only /pm2).
# Whitelisted to ``legal-`` names only — never paperclip or arbitrary processes.
_PM2_ACTIONS = {"restart", "stop", "start"}
# Our own pm2 process name. Restarting/stopping ourselves kills this process
# mid-reply, so those self-actions are detached (see pm2_control).
_OWN_PM2_NAME = os.environ.get("COURT_FETCH_SERVICE_PM2_NAME", "legal-court-fetch-service")
async def pm2_control(request: web.Request) -> web.Response:
"""Run ``pm2 <action> <name>`` for a whitelisted legal-* process."""
unauth = _check_bearer(request)
if unauth is not None:
return unauth
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "invalid JSON body"}, status=400)
name = str(body.get("name", "")).strip()
action = str(body.get("action", "")).strip()
if action not in _PM2_ACTIONS:
return web.json_response(
{"error": f"action must be one of {sorted(_PM2_ACTIONS)}"}, status=400
)
if not name.startswith("legal-"):
return web.json_response(
{"error": "name must be a legal-* process"}, status=403
)
# Self restart/stop kills this process before it can reply (client sees a
# dropped connection / 502) even though pm2 does perform the action. Detach
# it with a brief delay so the HTTP response flushes first, then report
# success optimistically.
if name == _OWN_PM2_NAME and action in ("restart", "stop"):
import asyncio as _asyncio
await _asyncio.create_subprocess_shell(f"sleep 1; pm2 {action} {name} --silent")
return web.json_response(
{"ok": True, "action": action, "deferred": True, "service": None}
)
try:
rc, out, err = await _pm2_run(action, name, "--silent", timeout=30)
if rc != 0:
return web.json_response(
{"ok": False,
"error": f"pm2 {action} {name} failed: "
f"{err.decode('utf-8','replace')[:200]}"},
status=502,
)
# Re-read just this process so the UI settles on the real new state.
rc2, out2, _ = await _pm2_run("jlist")
svc = None
if rc2 == 0:
for a in json.loads(out2.decode("utf-8", "replace")):
if a.get("name") == name:
svc = _trim_service(a)
break
return web.json_response({"ok": True, "action": action, "service": svc})
except FileNotFoundError:
return web.json_response({"error": "pm2 not found on PATH"}, status=502)
except Exception as e: # never throw
return web.json_response({"ok": False, "error": f"pm2 error: {e}"}, status=502)
def _check_bearer(request: web.Request) -> web.Response | None:
auth = request.headers.get("Authorization", "")
expected = "Bearer " + _SHARED_SECRET
if not auth or not hmac.compare_digest(auth, expected):
return web.json_response(
{"error": "unauthorized: missing or invalid Bearer token"}, status=401
)
return None
async def fetch(request: web.Request) -> web.Response:
unauth = _check_bearer(request)
if unauth is not None:
return unauth
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "invalid JSON body"}, status=400)
required = ("file_number", "month", "year")
if not all(body.get(k) for k in required):
return web.json_response(
{"ok": False, "reason": f"missing one of {required}"}, status=400
)
try:
result = await camofox_client.fetch_admin_verdict(
file_number=str(body["file_number"]),
month=str(body["month"]),
year=str(body["year"]),
case_number=str(body.get("case_number", "")),
court=str(body.get("court", "")),
)
return web.json_response({
"ok": True,
"content_b64": base64.b64encode(result["content"]).decode("ascii"),
"filename": result.get("filename", ""),
"source_url": result.get("source_url", ""),
"court": result.get("court", ""),
})
except (camofox_client.CamofoxUnavailable, camofox_client.NgcsFlowError) as e:
# Expected, recoverable failure → orchestrator escalates (INV-CF3).
return web.json_response({"ok": False, "reason": str(e)}, status=200)
except Exception as e: # noqa: BLE001
logger.exception("fetch failed")
return web.json_response({"ok": False, "reason": f"unexpected: {e}"}, status=200)
def build_app() -> web.Application:
app = web.Application(client_max_size=64 * 1024 * 1024)
app.router.add_get("/health", health)
app.router.add_get("/pm2", pm2_status)
app.router.add_post("/pm2/control", pm2_control)
app.router.add_post("/fetch", fetch)
return app
def main() -> int:
parser = argparse.ArgumentParser(description="legal-court-fetch-service")
parser.add_argument("--port", type=int, default=8771)
parser.add_argument("--host", default="10.0.1.1",
help="bind address; default = docker0 bridge gateway")
parser.add_argument("--log-level", default="INFO")
args = parser.parse_args()
logging.basicConfig(level=args.log_level.upper(),
format="%(asctime)s %(name)s %(levelname)s %(message)s")
secret = os.environ.get("COURT_FETCH_SHARED_SECRET", "").strip()
if not secret:
logger.error(
"COURT_FETCH_SHARED_SECRET is empty; refusing to start. Set it in "
"/home/chaim/.legal-court-fetch-service.env (loaded by pm2) and "
"mirror it as a Coolify env var on the legal-ai app."
)
return 2
if len(secret) < 24:
logger.error("COURT_FETCH_SHARED_SECRET too short (>=32 chars expected).")
return 2
global _SHARED_SECRET
_SHARED_SECRET = secret
app = build_app()
logger.info("legal-court-fetch-service listening on %s:%d", args.host, args.port)
web.run_app(app, host=args.host, port=args.port, print=lambda _m: None)
return 0
if __name__ == "__main__":
sys.exit(main())