feat(scripts): כפתור "הרץ" מ-UI לסקריפטי read-only (קטגוריה B #4)
הרצת-סקריפט-מ-UI ב-/scripts, רק לסקריפטי קריאה-בלבד/אודיט, דרך גשר-המארח
הקיים (court-fetch) — שיכפול דפוס /adapter-migration.
אבטחה:
- allowlist בצד-המארח (services/script_runner.py, מקור-אמת יחיד): name→argv
קבוע ובטוח. 5 סקריפטים מאומתים: leak_guard, check_undefined_names,
storage_leak_tripwire, audit_training_corpus, audit_corpus_integrity --no-notify.
- הגשר (court_fetch_service/server.py): endpoint POST /run-script, Bearer-auth,
ולידציית-allowlist, create_subprocess_exec (ללא shell), timeout 600s,
audit-log; מתעלם מארגומנטים מהבקשה (argv מה-allowlist בלבד).
- קונטיינר (web/app.py): POX /api/scripts/{name}/run proxy ל-גשר + pre-check
allowlist; הרחבת /api/scripts/catalog ב-runnable_scripts.
- UI: כפתור "הרץ" (ירוק) רק לשורות-runnable + דיאלוג-פלט (exit-code+stdout);
שאר השורות "מקור" בלבד. confirm לפני הרצה.
מאושר דרך שער-העיצוב (מוקאפ 16-scripts עודכן עם כפתור "הרץ").
G12: leak_guard עובר (אין סמלי-פלטפורמה בשכבת-האינטליגנציה).
Deploy דו-שלבי: גשר-המארח דורש git pull בעותק-המארח + pm2 restart
legal-court-fetch-service; הקונטיינר נפרס דרך Coolify כרגיל.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -44,6 +44,7 @@ if _pkg_root not in sys.path:
|
||||
|
||||
from legal_mcp.court_fetch_service import camofox_client # noqa: E402
|
||||
from legal_mcp.services import usage_limits # noqa: E402
|
||||
from legal_mcp.services import script_runner # noqa: E402
|
||||
|
||||
logger = logging.getLogger("legal_court_fetch_service")
|
||||
|
||||
@@ -342,6 +343,62 @@ async def adapter_migration(request: web.Request) -> web.Response:
|
||||
})
|
||||
|
||||
|
||||
# ─── run-script: host-side runner for read-only/audit scripts (#4) ─────────────
|
||||
# Same shape as /adapter-migration but for the SCRIPT_RUN_ALLOWLIST — a fixed set
|
||||
# of read-only scripts each with a hard-coded safe argv. The request body's only
|
||||
# meaningful field is ``name``; arguments are NEVER taken from the caller (so no
|
||||
# --apply/--force injection). Allowlist enforcement lives here, on the host.
|
||||
async def run_script(request: web.Request) -> web.Response:
|
||||
"""Run an allowlisted read-only script on the host and relay its result."""
|
||||
unauth = _check_bearer(request)
|
||||
if unauth is not None:
|
||||
return unauth
|
||||
try:
|
||||
body = await request.json()
|
||||
except json.JSONDecodeError:
|
||||
return web.json_response({"error": "invalid JSON body"}, status=400)
|
||||
|
||||
name = str(body.get("name", "")).strip()
|
||||
argv = script_runner.build_argv(name)
|
||||
if argv is None:
|
||||
return web.json_response(
|
||||
{"ok": False, "error": f"script not runnable (not in allowlist): {name!r}"},
|
||||
status=403,
|
||||
)
|
||||
|
||||
import asyncio as _asyncio
|
||||
|
||||
env = {**os.environ, "HOME": "/home/chaim"}
|
||||
try:
|
||||
proc = await _asyncio.create_subprocess_exec(
|
||||
*argv, cwd="/home/chaim/legal-ai", env=env,
|
||||
stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE,
|
||||
)
|
||||
out, err = await _asyncio.wait_for(proc.communicate(), timeout=600)
|
||||
except _asyncio.TimeoutError:
|
||||
return web.json_response({"ok": False, "error": "script timed out"}, status=504)
|
||||
except Exception as e: # never throw — relay the failure
|
||||
return web.json_response({"ok": False, "error": f"launch failed: {e}"}, status=502)
|
||||
|
||||
# best-effort audit trail — one line per run
|
||||
try:
|
||||
os.makedirs("/home/chaim/legal-ai/data/logs", exist_ok=True)
|
||||
stamp = time.strftime("%Y-%m-%dT%H:%M:%S%z")
|
||||
with open("/home/chaim/legal-ai/data/logs/script-runs.log", "a") as fh:
|
||||
fh.write(f"{stamp}\t{name}\texit={proc.returncode}\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 200 regardless of exit code — a non-zero audit result is informative output
|
||||
# the caller renders, not a transport error.
|
||||
return web.json_response({
|
||||
"ok": (proc.returncode == 0),
|
||||
"exit_code": proc.returncode,
|
||||
"stdout": out.decode("utf-8", "replace"),
|
||||
"stderr": err.decode("utf-8", "replace"),
|
||||
})
|
||||
|
||||
|
||||
def build_app() -> web.Application:
|
||||
app = web.Application(client_max_size=64 * 1024 * 1024)
|
||||
app.router.add_get("/health", health)
|
||||
@@ -350,6 +407,7 @@ def build_app() -> web.Application:
|
||||
app.router.add_post("/pm2/control", pm2_control)
|
||||
app.router.add_post("/fetch", fetch)
|
||||
app.router.add_post("/adapter-migration", adapter_migration)
|
||||
app.router.add_post("/run-script", run_script)
|
||||
return app
|
||||
|
||||
|
||||
|
||||
50
mcp-server/src/legal_mcp/services/script_runner.py
Normal file
50
mcp-server/src/legal_mcp/services/script_runner.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Allowlist of read-only scripts runnable from the /scripts page (#4).
|
||||
|
||||
Single source of truth shared by BOTH:
|
||||
- the host bridge (``court_fetch_service.server`` — the ENFORCER that actually
|
||||
launches the process), and
|
||||
- the container API (``web/app.py`` — display-only: tells the UI which rows get
|
||||
a "הרץ" button).
|
||||
|
||||
Each entry maps a script's basename to the EXACT, fixed argument list it runs
|
||||
with. **Read-only / audit scripts only**, with a safe fixed argv and **no
|
||||
user-supplied arguments** — never ``--apply``/``--force``. The system is
|
||||
single-user/internal, so this allowlist is a footgun-guard, not an auth boundary;
|
||||
enforcement lives on the trusted host side.
|
||||
|
||||
Adding an entry is a security decision: verify the script is read-only (no DB
|
||||
writes, no destructive side effects) with the given argv before listing it.
|
||||
|
||||
stdlib-only on purpose, so the lightweight host bridge can import it cheaply.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
_REPO = "/home/chaim/legal-ai"
|
||||
_PYTHON = f"{_REPO}/mcp-server/.venv/bin/python"
|
||||
|
||||
# basename → argv tail (script path relative to the repo root, then fixed flags).
|
||||
# Verified read-only 2026-06-17. `audit_corpus_integrity` runs with --no-notify
|
||||
# so it stays report-only (no notification side-effect) when run from the dashboard.
|
||||
SCRIPT_RUN_ALLOWLIST: dict[str, list[str]] = {
|
||||
"leak_guard.py": ["scripts/leak_guard.py"],
|
||||
"check_undefined_names.py": ["scripts/check_undefined_names.py"],
|
||||
"storage_leak_tripwire.py": ["scripts/storage_leak_tripwire.py"],
|
||||
"audit_training_corpus.py": ["scripts/audit_training_corpus.py"],
|
||||
"audit_corpus_integrity.py": ["scripts/audit_corpus_integrity.py", "--no-notify"],
|
||||
}
|
||||
|
||||
|
||||
def runnable_names() -> list[str]:
|
||||
"""Sorted basenames the UI may show a "הרץ" button for (display-only)."""
|
||||
return sorted(SCRIPT_RUN_ALLOWLIST)
|
||||
|
||||
|
||||
def build_argv(name: str) -> list[str] | None:
|
||||
"""Full argv (python + absolute script path + fixed flags) for an allowlisted
|
||||
script, or ``None`` when *name* is not allowlisted. Arguments are taken ONLY
|
||||
from the allowlist — anything the caller passes is ignored."""
|
||||
tail = SCRIPT_RUN_ALLOWLIST.get(name)
|
||||
if tail is None:
|
||||
return None
|
||||
return [_PYTHON, f"{_REPO}/{tail[0]}", *tail[1:]]
|
||||
Reference in New Issue
Block a user