"""HTTP+SSE bridge from FastAPI (in container) to local claude CLI. Endpoints: POST /chat/start — body: {prompt, system?, resume_session_id?} returns SSE stream of events from ``claude_session.query_streaming``. REQUIRES Authorization: Bearer . GET /health — liveness probe (no auth — used by FastAPI for status). Run with pm2: pm2 start scripts/legal-chat-service.config.cjs Standalone for dev: cd ~/legal-ai/mcp-server LEGAL_CHAT_SHARED_SECRET=... .venv/bin/python -m legal_mcp.chat_service.server \ --port 8770 --host 10.0.1.1 Security posture ---------------- 1. Bind defaults to ``10.0.1.1`` — the host's docker0 bridge gateway. Containers on docker bridges (including the legal-ai container, which sits on the ``coolify`` network but routes to docker0 at the host) can reach this address; processes outside the host cannot. Binding to ``0.0.0.0`` is permitted but discouraged (relies on the cloud-level firewall as the sole perimeter). 2. ``/chat/start`` requires a ``Authorization: Bearer `` header. The secret is loaded from the environment; without it set, the server refuses to start (no fallback to "open" mode, by design — the claude CLI it spawns can run arbitrary tool calls, so an unauthenticated /chat/start is RCE-equivalent). 3. ``/health`` is intentionally unauthenticated so the FastAPI proxy can probe liveness with no token. It returns only a static OK and never spawns subprocesses, so it can't be abused. """ from __future__ import annotations import argparse import asyncio import json import logging import os import sys from typing import Any from aiohttp import web # Run-via-CLI bootstrap so ``python -m legal_mcp.chat_service.server`` # works even when the package isn't installed (it is in the venv, but # this safeguard keeps the entrypoint robust). _pkg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) if _pkg_root not in sys.path: sys.path.insert(0, _pkg_root) from legal_mcp.services import claude_session # noqa: E402 logger = logging.getLogger("legal_chat_service") # Loaded once at startup. Validated to be non-empty in main(); the handler # uses a constant-time compare to avoid timing oracles on a short input. _SHARED_SECRET: str = "" async def health(request: web.Request) -> web.Response: return web.json_response({"ok": True, "service": "legal-chat-service"}) def _check_bearer(request: web.Request) -> web.Response | None: """Validate ``Authorization: Bearer ``. Returns 401 response on failure.""" auth = request.headers.get("Authorization", "") expected = "Bearer " + _SHARED_SECRET # ``compare_digest`` defends against timing attacks. Strings of different # length still leak length, but for a 43-char urlsafe token that's # uninteresting and the auth scheme prefix anchors it anyway. import hmac if not auth or not hmac.compare_digest(auth, expected): return web.json_response( {"error": "unauthorized: missing or invalid Bearer token"}, status=401, ) return None async def chat_start(request: web.Request) -> web.StreamResponse: """Drive ``claude_session.query_streaming`` and forward events as SSE. Request body (JSON): prompt: str — required, user message system: str | None — system instructions (ignored if resuming) resume_session_id: str | None — continue a prior CLI session timeout: int = 3600 — hard timeout for the subprocess """ unauth = _check_bearer(request) if unauth is not None: return unauth try: body = await request.json() except json.JSONDecodeError: return web.json_response({"error": "invalid JSON body"}, status=400) prompt = body.get("prompt") or "" if not prompt.strip(): return web.json_response({"error": "prompt is required"}, status=400) system = body.get("system") resume_session_id = body.get("resume_session_id") timeout = int(body.get("timeout") or 3600) response = web.StreamResponse( status=200, reason="OK", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", # X-Accel-Buffering=no defeats nginx/traefik buffering — the # FastAPI container proxies via httpx and forwards bytes as # they arrive, but the inner header is harmless and makes # browser-direct testing easier. "X-Accel-Buffering": "no", }, ) await response.prepare(request) async def send_event(payload: dict[str, Any]) -> None: line = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" await response.write(line.encode("utf-8")) try: async for event in claude_session.query_streaming( prompt, system=system, resume_session_id=resume_session_id, timeout=timeout, ): await send_event(event) if event.get("type") == "done" or event.get("type") == "error": break except asyncio.CancelledError: # Client disconnected — bail cleanly. logger.info("chat_start: client disconnected") except Exception as e: logger.exception("chat_start: streaming failed") try: await send_event({"type": "error", "message": str(e)}) except ConnectionResetError: pass try: await response.write_eof() except ConnectionResetError: pass return response def build_app() -> web.Application: app = web.Application() app.router.add_get("/health", health) app.router.add_post("/chat/start", chat_start) return app def main() -> int: parser = argparse.ArgumentParser(description="legal-chat-service") parser.add_argument("--port", type=int, default=8770) parser.add_argument( "--host", default="10.0.1.1", help=( "bind address. Default 10.0.1.1 = docker0 bridge gateway — " "reachable from containers, invisible to non-host networks. " "Use 127.0.0.1 for host-local dev; do not bind 0.0.0.0 " "without a separate perimeter firewall." ), ) parser.add_argument("--log-level", default="INFO") args = parser.parse_args() logging.basicConfig( level=args.log_level.upper(), format="%(asctime)s %(name)s %(levelname)s %(message)s", ) secret = os.environ.get("LEGAL_CHAT_SHARED_SECRET", "").strip() if not secret: logger.error( "LEGAL_CHAT_SHARED_SECRET is empty; refusing to start. " "Set it in /home/chaim/.legal-chat-service.env (loaded by " "pm2) and mirror it as a Coolify env var on the legal-ai app." ) return 2 if len(secret) < 24: logger.error( "LEGAL_CHAT_SHARED_SECRET is too short (got %d chars); " "refusing to start. Use >=32 chars (e.g. python3 -c " "'import secrets; print(secrets.token_urlsafe(32))').", len(secret), ) return 2 global _SHARED_SECRET _SHARED_SECRET = secret app = build_app() logger.info("legal-chat-service listening on %s:%d", args.host, args.port) web.run_app(app, host=args.host, port=args.port, print=lambda _msg: None) return 0 if __name__ == "__main__": sys.exit(main())