Merge pull request 'feat(scripts): כפתור "הרץ" מ-UI לסקריפטי read-only (קטגוריה B #4)' (#283) from worktree-scripts-run-ui into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m34s
G12 Leak-Guard / leak-guard (push) Successful in 4s
Lint — undefined names / undefined-names (push) Successful in 10s

This commit was merged in pull request #283.
This commit is contained in:
2026-06-17 04:31:19 +00:00
5 changed files with 315 additions and 25 deletions

View File

@@ -44,6 +44,7 @@ if _pkg_root not in sys.path:
from legal_mcp.court_fetch_service import camofox_client # noqa: E402
from legal_mcp.services import usage_limits # noqa: E402
from legal_mcp.services import script_runner # noqa: E402
logger = logging.getLogger("legal_court_fetch_service")
@@ -342,6 +343,62 @@ async def adapter_migration(request: web.Request) -> web.Response:
})
# ─── run-script: host-side runner for read-only/audit scripts (#4) ─────────────
# Same shape as /adapter-migration but for the SCRIPT_RUN_ALLOWLIST — a fixed set
# of read-only scripts each with a hard-coded safe argv. The request body's only
# meaningful field is ``name``; arguments are NEVER taken from the caller (so no
# --apply/--force injection). Allowlist enforcement lives here, on the host.
async def run_script(request: web.Request) -> web.Response:
"""Run an allowlisted read-only script on the host and relay its result."""
unauth = _check_bearer(request)
if unauth is not None:
return unauth
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "invalid JSON body"}, status=400)
name = str(body.get("name", "")).strip()
argv = script_runner.build_argv(name)
if argv is None:
return web.json_response(
{"ok": False, "error": f"script not runnable (not in allowlist): {name!r}"},
status=403,
)
import asyncio as _asyncio
env = {**os.environ, "HOME": "/home/chaim"}
try:
proc = await _asyncio.create_subprocess_exec(
*argv, cwd="/home/chaim/legal-ai", env=env,
stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE,
)
out, err = await _asyncio.wait_for(proc.communicate(), timeout=600)
except _asyncio.TimeoutError:
return web.json_response({"ok": False, "error": "script timed out"}, status=504)
except Exception as e: # never throw — relay the failure
return web.json_response({"ok": False, "error": f"launch failed: {e}"}, status=502)
# best-effort audit trail — one line per run
try:
os.makedirs("/home/chaim/legal-ai/data/logs", exist_ok=True)
stamp = time.strftime("%Y-%m-%dT%H:%M:%S%z")
with open("/home/chaim/legal-ai/data/logs/script-runs.log", "a") as fh:
fh.write(f"{stamp}\t{name}\texit={proc.returncode}\n")
except Exception:
pass
# 200 regardless of exit code — a non-zero audit result is informative output
# the caller renders, not a transport error.
return web.json_response({
"ok": (proc.returncode == 0),
"exit_code": proc.returncode,
"stdout": out.decode("utf-8", "replace"),
"stderr": err.decode("utf-8", "replace"),
})
def build_app() -> web.Application:
app = web.Application(client_max_size=64 * 1024 * 1024)
app.router.add_get("/health", health)
@@ -350,6 +407,7 @@ def build_app() -> web.Application:
app.router.add_post("/pm2/control", pm2_control)
app.router.add_post("/fetch", fetch)
app.router.add_post("/adapter-migration", adapter_migration)
app.router.add_post("/run-script", run_script)
return app

View File

@@ -0,0 +1,50 @@
"""Allowlist of read-only scripts runnable from the /scripts page (#4).
Single source of truth shared by BOTH:
- the host bridge (``court_fetch_service.server`` — the ENFORCER that actually
launches the process), and
- the container API (``web/app.py`` — display-only: tells the UI which rows get
a "הרץ" button).
Each entry maps a script's basename to the EXACT, fixed argument list it runs
with. **Read-only / audit scripts only**, with a safe fixed argv and **no
user-supplied arguments** — never ``--apply``/``--force``. The system is
single-user/internal, so this allowlist is a footgun-guard, not an auth boundary;
enforcement lives on the trusted host side.
Adding an entry is a security decision: verify the script is read-only (no DB
writes, no destructive side effects) with the given argv before listing it.
stdlib-only on purpose, so the lightweight host bridge can import it cheaply.
"""
from __future__ import annotations
_REPO = "/home/chaim/legal-ai"
_PYTHON = f"{_REPO}/mcp-server/.venv/bin/python"
# basename → argv tail (script path relative to the repo root, then fixed flags).
# Verified read-only 2026-06-17. `audit_corpus_integrity` runs with --no-notify
# so it stays report-only (no notification side-effect) when run from the dashboard.
SCRIPT_RUN_ALLOWLIST: dict[str, list[str]] = {
"leak_guard.py": ["scripts/leak_guard.py"],
"check_undefined_names.py": ["scripts/check_undefined_names.py"],
"storage_leak_tripwire.py": ["scripts/storage_leak_tripwire.py"],
"audit_training_corpus.py": ["scripts/audit_training_corpus.py"],
"audit_corpus_integrity.py": ["scripts/audit_corpus_integrity.py", "--no-notify"],
}
def runnable_names() -> list[str]:
"""Sorted basenames the UI may show a "הרץ" button for (display-only)."""
return sorted(SCRIPT_RUN_ALLOWLIST)
def build_argv(name: str) -> list[str] | None:
"""Full argv (python + absolute script path + fixed flags) for an allowlisted
script, or ``None`` when *name* is not allowlisted. Arguments are taken ONLY
from the allowlist — anything the caller passes is ignored."""
tail = SCRIPT_RUN_ALLOWLIST.get(name)
if tail is None:
return None
return [_PYTHON, f"{_REPO}/{tail[0]}", *tail[1:]]