Files
legal-ai/mcp-server/src/legal_mcp/services/claude_session.py
Chaim 9d2536a667 fix(#85): claude_session — retry על כשלים חולפים של claude -p
שורש #85 התברר: `claude -p` נכשל מדי פעם ב-exit מהיר + stderr ריק על
פרומפטים גדולים/איטיים (CEO write_interim_draft, learning_loop distillation),
**אותו פרומפט מצליח בריצה חוזרת** — כשל חולף, לא nesting (אומת: nested claude
מ-bash וגם פרומפט 70K הצליחו; הכשל אינו דטרמיניסטי).

query() עוטף spawn+communicate ב-לולאת retry (MAX_RETRIES=3, backoff לינארי
5s*attempt). FileNotFoundError + timeout נשארים דטרמיניסטיים (ללא retry).
empty-response גם מטופל כ-transient.

אומת e2e: distillation על 1130-25 רץ בהצלחה → pair=analyzed (9 שינויים,
6 style_method, 33.8% diff). פותר גם את write_interim_draft של ה-CEO.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 20:05:57 +00:00

357 lines
14 KiB
Python

"""Claude Code session bridge — runs prompts via the local `claude` CLI.
All LLM calls in legal-ai go through this module. We shell out to the local
Claude Code CLI which uses the developer's claude.ai session — zero direct
API cost.
**Architectural rule (do not violate):** this module only works when invoked
from the local MCP server (the Python process at
`/home/chaim/legal-ai/mcp-server/`, launched per `~/.claude.json`). It will
**not** work when called from the legal-ai Docker container — that container
has no `claude` CLI and no claude.ai session. Any code path under `web/`
(FastAPI) that calls this module — directly or via an extractor like
`halacha_extractor`, `claims_extractor`, `precedent_metadata_extractor`,
`block_writer`, `qa_validator`, `learning_loop`, `local_classifier`,
`appraiser_facts_extractor`, `brainstorm`, `style_analyzer` — is wrong.
LLM-dependent operations must be exposed as MCP tools and triggered from
agents (or the chair via Claude Code), where this module runs locally with
CLI access.
Async history: originally synchronous (``subprocess.run``) with a 120 s
timeout. That broke for large legal documents — sync subprocess stalled the
asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts
(case 8174-24 hit three timeouts in a row). Fixed by going async with a
30-minute ceiling.
"""
from __future__ import annotations
import asyncio
import json
import logging
from legal_mcp.config import parse_llm_json
logger = logging.getLogger(__name__)
# Default ceiling for any single ``claude -p`` invocation, in seconds.
# 30 min covers any single-document call we make in practice (chunking
# handles the rest); the bound exists only to prevent runaway zombies.
DEFAULT_TIMEOUT = 1800
LONG_TIMEOUT = 3600 # opus block writing on full case context
# #85 — `claude -p` fails intermittently with a fast non-zero exit and empty
# stderr (observed on large/slow cold prompts: CEO write_interim_draft,
# learning_loop distillation). The SAME prompt succeeds on retry, so the bail is
# transient — retry with linear backoff. Timeouts and "CLI not found" are
# deterministic and are NOT retried.
MAX_RETRIES = 3
RETRY_BACKOFF_BASE = 5 # seconds; sleep = base * attempt_number
async def query(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
max_turns: int = 1,
*,
system: str | None = None,
model: str | None = None,
effort: str | None = None,
) -> str:
"""Send a prompt to Claude Code headless and return the text response.
Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit —
prompts can be 500K+ chars when analyzing a full style corpus.
Args:
prompt: The prompt to send.
timeout: Max seconds before the subprocess is killed.
max_turns: Max conversation turns (1 = single response).
system: Optional repeated-instruction text. Prepended to ``prompt``
for the CLI; we don't pass it as a separate arg because the
CLI doesn't expose API-level caching. The parameter exists so
extractors can structure their calls cleanly today, and to make
a future SDK-backed path drop-in.
model: Optional model alias/id (e.g. ``claude-opus-4-8``). When set,
passed as ``--model``; otherwise the CLI's session default is
used. Lets quality-sensitive extractors (halacha) pin a stronger
model without changing the default for every caller.
effort: Optional effort level (``low``/``medium``/``high``/``xhigh``/
``max``). When set, passed as ``--effort``. Pairs with ``model``;
an empty string is treated as "unset" (CLI default).
Returns:
The text response from Claude.
Raises:
RuntimeError: if the CLI is unavailable (e.g., called from the
container — see module docstring), or fails, or times out.
"""
full_prompt = f"{system}\n\n{prompt}" if system else prompt
if len(full_prompt) > 150_000:
logger.warning("Large prompt: %d chars — may hit context limits", len(full_prompt))
cmd = [
"claude", "-p",
"--output-format", "json",
"--max-turns", str(max_turns),
]
if model:
cmd += ["--model", model]
if effort:
cmd += ["--effort", effort]
size_info = f"; prompt_len={len(full_prompt):,} chars" if len(full_prompt) > 100_000 else ""
last_err = "unknown error"
for attempt in range(1, MAX_RETRIES + 1):
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError:
# Deterministic — never retry.
raise RuntimeError(
"Claude CLI not found. This module only works when invoked "
"from the local MCP server — see the architectural rule in "
"the module docstring. If this error came from a FastAPI "
"endpoint in the container, refactor the call into an MCP "
"tool that the chair triggers from Claude Code."
)
try:
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=full_prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
# wait_for cancellation alone leaves the child running. A timeout is
# a real ceiling, not a transient blip — don't retry.
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
raise RuntimeError(f"Claude CLI timed out after {timeout}s")
if proc.returncode != 0:
stderr = stderr_b.decode("utf-8", errors="replace").strip()[:500] or "unknown error"
last_err = f"exit {proc.returncode}: {stderr}"
else:
stdout = stdout_b.decode("utf-8", errors="replace").strip()
if stdout:
# claude -p --output-format json returns {"type":"result","result":"..."}
try:
data = json.loads(stdout)
if isinstance(data, dict) and "result" in data:
return data["result"]
return stdout
except json.JSONDecodeError:
return stdout
last_err = "empty response"
# Transient failure — retry with linear backoff unless this was the last try.
if attempt < MAX_RETRIES:
logger.warning(
"claude -p attempt %d/%d failed (%s%s) — retrying in %ds",
attempt, MAX_RETRIES, last_err, size_info, RETRY_BACKOFF_BASE * attempt,
)
await asyncio.sleep(RETRY_BACKOFF_BASE * attempt)
raise RuntimeError(
f"Claude CLI failed after {MAX_RETRIES} attempts ({last_err}){size_info}"
)
async def query_json(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
*,
system: str | None = None,
model: str | None = None,
effort: str | None = None,
) -> dict | list | None:
"""Send a prompt and parse the response as JSON.
Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation).
``model``/``effort`` are forwarded to :func:`query` (see its docstring).
"""
raw = await query(prompt, timeout=timeout, system=system, model=model, effort=effort)
return parse_llm_json(raw)
# ── Streaming + session continuation ────────────────────────────────
async def query_streaming(
prompt: str,
*,
system: str | None = None,
resume_session_id: str | None = None,
timeout: int = LONG_TIMEOUT,
cwd: str | None = None,
):
"""Stream Claude's response as an async iterator of events.
Wraps `claude -p --output-format=stream-json` (newline-delimited JSON
objects from the CLI) and translates each line into a small, stable
shape that the chat service / SSE proxy can forward without leaking
CLI internals to the browser.
Event shapes yielded:
{"type": "session_id", "value": "<uuid>"} # first event, used for resume
{"type": "text_delta", "text": "<partial>"} # incremental assistant text
{"type": "tool_use", "name": "...", "input": {...}}
{"type": "error", "message": "..."}
{"type": "done", "text": "<full response>"}
The CLI emits a richer stream; we project to this minimal set so the
front-end can stay stable across CLI upgrades.
Args:
prompt: The user message to send.
system: Optional system instructions (used only when starting a
fresh conversation — when resume_session_id is set, the
session already carries its system prompt).
resume_session_id: Continue a prior conversation. When given,
we don't re-send the system prompt; the CLI loads the
entire conversation history from disk.
timeout: Hard ceiling on the subprocess.
cwd: Working directory for the subprocess — defaults to the
host's HOME so claude.ai credentials resolve correctly.
"""
if resume_session_id:
# When resuming, system is already baked into the on-disk session
# — sending it again would be a no-op at best and confuse the
# conversation at worst.
full_prompt = prompt
cmd = [
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
"--resume", resume_session_id,
]
else:
full_prompt = f"{system}\n\n{prompt}" if system else prompt
cmd = [
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
]
if len(full_prompt) > 200_000:
logger.warning(
"Streaming: large prompt (%d chars) — may hit CLI input limits",
len(full_prompt),
)
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
except FileNotFoundError:
yield {
"type": "error",
"message": (
"Claude CLI not found on host — legal-chat-service must "
"run where the `claude` binary is installed (Daphna's host, "
"not the legal-ai container)."
),
}
return
assert proc.stdin is not None # for type checkers
assert proc.stdout is not None
# Send the prompt and close stdin so the CLI knows the user message
# is complete.
try:
proc.stdin.write(full_prompt.encode("utf-8"))
await proc.stdin.drain()
proc.stdin.close()
except BrokenPipeError:
# CLI exited before reading the prompt — drain stderr and bail.
stderr_b = await proc.stderr.read() if proc.stderr else b""
yield {
"type": "error",
"message": f"Claude CLI closed stdin early: {stderr_b.decode('utf-8', errors='replace')[:300]}",
}
return
accumulated_text: list[str] = []
session_id_emitted = False
deadline = asyncio.get_event_loop().time() + timeout
try:
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
yield {"type": "error", "message": f"timed out after {timeout}s"}
break
try:
line_b = await asyncio.wait_for(proc.stdout.readline(), timeout=remaining)
except asyncio.TimeoutError:
yield {"type": "error", "message": f"stream timed out after {timeout}s"}
break
if not line_b:
break
line = line_b.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
# Stray non-JSON line from CLI — surface a snippet for debug.
logger.debug("non-JSON stream line: %s", line[:120])
continue
# The CLI's stream-json emits several event types. We only
# care about the ones the chat service forwards.
t = event.get("type")
if not session_id_emitted:
sid = event.get("session_id")
if sid:
session_id_emitted = True
yield {"type": "session_id", "value": sid}
if t == "assistant":
# event["message"]["content"] is a list of blocks; we extract
# text blocks and tool_use blocks.
msg = event.get("message") or {}
for block in msg.get("content") or []:
btype = block.get("type")
if btype == "text":
text = block.get("text") or ""
if text:
accumulated_text.append(text)
yield {"type": "text_delta", "text": text}
elif btype == "tool_use":
yield {
"type": "tool_use",
"name": block.get("name") or "",
"input": block.get("input") or {},
}
elif t == "result":
# Final synthesized result line from the CLI — we already
# delivered the deltas, so just stop here.
break
finally:
if proc.returncode is None:
try:
proc.kill()
except ProcessLookupError:
pass
try:
await proc.wait()
except Exception:
pass
yield {"type": "done", "text": "".join(accumulated_text)}