"""Claude Code session bridge — runs prompts via the local `claude` CLI. All LLM calls in legal-ai go through this module. We shell out to the local Claude Code CLI which uses the developer's claude.ai session — zero direct API cost. **Architectural rule (do not violate):** this module only works when invoked from the local MCP server (the Python process at `/home/chaim/legal-ai/mcp-server/`, launched per `~/.claude.json`). It will **not** work when called from the legal-ai Docker container — that container has no `claude` CLI and no claude.ai session. Any code path under `web/` (FastAPI) that calls this module — directly or via an extractor like `halacha_extractor`, `claims_extractor`, `precedent_metadata_extractor`, `block_writer`, `qa_validator`, `learning_loop`, `local_classifier`, `appraiser_facts_extractor`, `brainstorm`, `style_analyzer` — is wrong. LLM-dependent operations must be exposed as MCP tools and triggered from agents (or the chair via Claude Code), where this module runs locally with CLI access. Async history: originally synchronous (``subprocess.run``) with a 120 s timeout. That broke for large legal documents — sync subprocess stalled the asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts (case 8174-24 hit three timeouts in a row). Fixed by going async with a 30-minute ceiling. """ from __future__ import annotations import asyncio import json import logging import os from legal_mcp.config import parse_llm_json logger = logging.getLogger(__name__) # Default ceiling for any single ``claude -p`` invocation, in seconds. # 30 min covers any single-document call we make in practice (chunking # handles the rest); the bound exists only to prevent runaway zombies. DEFAULT_TIMEOUT = 1800 LONG_TIMEOUT = 3600 # opus block writing on full case context # #85 — two complementary hardenings for the same symptom (`claude -p` failing # with a fast non-zero exit + empty stderr on large/slow cold prompts: CEO # write_interim_draft, learning_loop distillation): # # 1. CLEAN ENV (defensive): a running Claude Code session exports markers into # child processes; a *nested* ``claude -p`` inherits them. Stripping them lets # every nested invocation launch as a clean top-level session. Could not be # reproduced deterministically, so it's a suspect, not a proven cause. Auth/ # config (CLAUDE_CONFIG_DIR, ANTHROPIC_*, PATH, HOME) are kept. # 2. RETRY (the real fix): the SAME large prompt that exits 1 once succeeds on a # plain retry — the bail is transient. Retry with linear backoff. Timeouts and # "CLI not found" stay deterministic and are NOT retried. # See TaskMaster legal-ai #85. _SESSION_MARKER_PREFIXES = ("CLAUDECODE", "CLAUDE_CODE_", "CLAUDE_AGENT_") _SESSION_MARKER_EXACT = frozenset({"AI_AGENT", "CLAUDE_EFFORT"}) MAX_RETRIES = 3 RETRY_BACKOFF_BASE = 5 # seconds; sleep = base * attempt_number def _clean_subprocess_env() -> dict[str, str]: """Copy the current env minus Claude Code session markers. Lets a nested ``claude -p`` start fresh instead of detecting it is already inside a Claude Code session (#85). """ env = dict(os.environ) for key in list(env): if key in _SESSION_MARKER_EXACT or key.startswith(_SESSION_MARKER_PREFIXES): del env[key] return env async def query( prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1, *, system: str | None = None, model: str | None = None, effort: str | None = None, ) -> str: """Send a prompt to Claude Code headless and return the text response. Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit — prompts can be 500K+ chars when analyzing a full style corpus. Args: prompt: The prompt to send. timeout: Max seconds before the subprocess is killed. max_turns: Max conversation turns (1 = single response). system: Optional repeated-instruction text. Prepended to ``prompt`` for the CLI; we don't pass it as a separate arg because the CLI doesn't expose API-level caching. The parameter exists so extractors can structure their calls cleanly today, and to make a future SDK-backed path drop-in. model: Optional model alias/id (e.g. ``claude-opus-4-8``). When set, passed as ``--model``; otherwise the CLI's session default is used. Lets quality-sensitive extractors (halacha) pin a stronger model without changing the default for every caller. effort: Optional effort level (``low``/``medium``/``high``/``xhigh``/ ``max``). When set, passed as ``--effort``. Pairs with ``model``; an empty string is treated as "unset" (CLI default). Returns: The text response from Claude. Raises: RuntimeError: if the CLI is unavailable (e.g., called from the container — see module docstring), or fails, or times out. """ full_prompt = f"{system}\n\n{prompt}" if system else prompt if len(full_prompt) > 150_000: logger.warning("Large prompt: %d chars — may hit context limits", len(full_prompt)) cmd = [ "claude", "-p", "--output-format", "json", "--max-turns", str(max_turns), ] if model: cmd += ["--model", model] if effort: cmd += ["--effort", effort] size_info = f"; prompt_len={len(full_prompt):,} chars" if len(full_prompt) > 100_000 else "" last_err = "unknown error" for attempt in range(1, MAX_RETRIES + 1): try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_clean_subprocess_env(), cwd=os.path.expanduser("~"), ) except FileNotFoundError: # Deterministic — never retry. raise RuntimeError( "Claude CLI not found. This module only works when invoked " "from the local MCP server — see the architectural rule in " "the module docstring. If this error came from a FastAPI " "endpoint in the container, refactor the call into an MCP " "tool that the chair triggers from Claude Code." ) try: stdout_b, stderr_b = await asyncio.wait_for( proc.communicate(input=full_prompt.encode("utf-8")), timeout=timeout, ) except asyncio.TimeoutError: # wait_for cancellation alone leaves the child running. A timeout is # a real ceiling, not a transient blip — don't retry. try: proc.kill() await proc.wait() except ProcessLookupError: pass raise RuntimeError(f"Claude CLI timed out after {timeout}s") if proc.returncode != 0: # The CLI sometimes writes its diagnostic to stdout (or nowhere) # rather than stderr (#85) — surface whichever is present. stderr = stderr_b.decode("utf-8", errors="replace").strip() stdout = stdout_b.decode("utf-8", errors="replace").strip() last_err = f"exit {proc.returncode}: {(stderr or stdout or 'no output')[:500]}" else: stdout = stdout_b.decode("utf-8", errors="replace").strip() if stdout: # claude -p --output-format json returns {"type":"result","result":"..."} try: data = json.loads(stdout) if isinstance(data, dict) and "result" in data: return data["result"] return stdout except json.JSONDecodeError: return stdout last_err = "empty response" # Transient failure — retry with linear backoff unless this was the last try. if attempt < MAX_RETRIES: logger.warning( "claude -p attempt %d/%d failed (%s%s) — retrying in %ds", attempt, MAX_RETRIES, last_err, size_info, RETRY_BACKOFF_BASE * attempt, ) await asyncio.sleep(RETRY_BACKOFF_BASE * attempt) raise RuntimeError( f"Claude CLI failed after {MAX_RETRIES} attempts ({last_err}){size_info}" ) async def query_json( prompt: str, timeout: int = DEFAULT_TIMEOUT, *, system: str | None = None, model: str | None = None, effort: str | None = None, ) -> dict | list | None: """Send a prompt and parse the response as JSON. Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation). ``model``/``effort`` are forwarded to :func:`query` (see its docstring). """ raw = await query(prompt, timeout=timeout, system=system, model=model, effort=effort) return parse_llm_json(raw) # ── Streaming + session continuation ──────────────────────────────── async def query_streaming( prompt: str, *, system: str | None = None, resume_session_id: str | None = None, timeout: int = LONG_TIMEOUT, cwd: str | None = None, ): """Stream Claude's response as an async iterator of events. Wraps `claude -p --output-format=stream-json` (newline-delimited JSON objects from the CLI) and translates each line into a small, stable shape that the chat service / SSE proxy can forward without leaking CLI internals to the browser. Event shapes yielded: {"type": "session_id", "value": ""} # first event, used for resume {"type": "text_delta", "text": ""} # incremental assistant text {"type": "tool_use", "name": "...", "input": {...}} {"type": "error", "message": "..."} {"type": "done", "text": ""} The CLI emits a richer stream; we project to this minimal set so the front-end can stay stable across CLI upgrades. Args: prompt: The user message to send. system: Optional system instructions (used only when starting a fresh conversation — when resume_session_id is set, the session already carries its system prompt). resume_session_id: Continue a prior conversation. When given, we don't re-send the system prompt; the CLI loads the entire conversation history from disk. timeout: Hard ceiling on the subprocess. cwd: Working directory for the subprocess — defaults to the host's HOME so claude.ai credentials resolve correctly. """ if resume_session_id: # When resuming, system is already baked into the on-disk session # — sending it again would be a no-op at best and confuse the # conversation at worst. full_prompt = prompt cmd = [ "claude", "-p", "--output-format", "stream-json", "--verbose", "--resume", resume_session_id, ] else: full_prompt = f"{system}\n\n{prompt}" if system else prompt cmd = [ "claude", "-p", "--output-format", "stream-json", "--verbose", ] if len(full_prompt) > 200_000: logger.warning( "Streaming: large prompt (%d chars) — may hit CLI input limits", len(full_prompt), ) try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=_clean_subprocess_env(), ) except FileNotFoundError: yield { "type": "error", "message": ( "Claude CLI not found on host — legal-chat-service must " "run where the `claude` binary is installed (Daphna's host, " "not the legal-ai container)." ), } return assert proc.stdin is not None # for type checkers assert proc.stdout is not None # Send the prompt and close stdin so the CLI knows the user message # is complete. try: proc.stdin.write(full_prompt.encode("utf-8")) await proc.stdin.drain() proc.stdin.close() except BrokenPipeError: # CLI exited before reading the prompt — drain stderr and bail. stderr_b = await proc.stderr.read() if proc.stderr else b"" yield { "type": "error", "message": f"Claude CLI closed stdin early: {stderr_b.decode('utf-8', errors='replace')[:300]}", } return accumulated_text: list[str] = [] session_id_emitted = False deadline = asyncio.get_event_loop().time() + timeout try: while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: yield {"type": "error", "message": f"timed out after {timeout}s"} break try: line_b = await asyncio.wait_for(proc.stdout.readline(), timeout=remaining) except asyncio.TimeoutError: yield {"type": "error", "message": f"stream timed out after {timeout}s"} break if not line_b: break line = line_b.decode("utf-8", errors="replace").strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: # Stray non-JSON line from CLI — surface a snippet for debug. logger.debug("non-JSON stream line: %s", line[:120]) continue # The CLI's stream-json emits several event types. We only # care about the ones the chat service forwards. t = event.get("type") if not session_id_emitted: sid = event.get("session_id") if sid: session_id_emitted = True yield {"type": "session_id", "value": sid} if t == "assistant": # event["message"]["content"] is a list of blocks; we extract # text blocks and tool_use blocks. msg = event.get("message") or {} for block in msg.get("content") or []: btype = block.get("type") if btype == "text": text = block.get("text") or "" if text: accumulated_text.append(text) yield {"type": "text_delta", "text": text} elif btype == "tool_use": yield { "type": "tool_use", "name": block.get("name") or "", "input": block.get("input") or {}, } elif t == "result": # Final synthesized result line from the CLI — we already # delivered the deltas, so just stop here. break finally: if proc.returncode is None: try: proc.kill() except ProcessLookupError: pass try: await proc.wait() except Exception: pass yield {"type": "done", "text": "".join(accumulated_text)}