Files
legal-ai/mcp-server/src/legal_mcp/services/claude_session.py
Chaim 6cc100f9f8
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
Lint — undefined names / undefined-names (pull_request) Successful in 10s
fix(halacha): rate-limit refusal ≠ empty answer — לא checkpoint chunk בכשל (#144)
תיקון-ליבה (b): כש-claude CLI מחזיר exit=0 עם הודעת-מגבלה/שגיאה כ-result, query
זיהה אותה כהצלחה → _extract_chunk קיבל []/non-list וסימן chunk כ-done-ריק; resume
דילג עליו לתמיד → תת-חילוץ קבוע (3→1→0). עכשיו is_error/_looks_like_limit_notice
הופכים אותה לכשל-חולף → retry → raise → chunk נשאר un-checkpointed → resume משחזר
(כך force-delete כבר לא הרסני-לצמיתות).

+ churn-detect במתזמר (Δdone<0 / Δhal<-2 → אזהרה+churn_ok ב-JSON).
+ scripts/reconcile_under_extracted_halacha.py — שחזור completed-עם-0-הלכות-ו≥3
  מקטעי-נימוק (dry-run הראה 15 מועמדים); נתיב-הזמנה קנוני (G2), שמרני (לא remand).

הערה: אטומיות-מלאה (staging_run_id) נדחתה — PR #257 מיתן את ה-trigger, ו-(b)+resume
מונעים אובדן-קבוע (force-delete מתאושש דרך resume).

בדיקות: test_claude_session_limit_notice. כל 354 עוברות. guards נקיים.
Invariants: G1, INV-G3/X16 (checkpoint=הושלם-באמת), INV-G4 (churn לא-שקט), G12.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 04:21:15 +00:00

440 lines
18 KiB
Python

"""Claude Code session bridge — runs prompts via the local `claude` CLI.
All LLM calls in legal-ai go through this module. We shell out to the local
Claude Code CLI which uses the developer's claude.ai session — zero direct
API cost.
**Architectural rule (do not violate):** this module only works when invoked
from the local MCP server (the Python process at
`/home/chaim/legal-ai/mcp-server/`, launched per `~/.claude.json`). It will
**not** work when called from the legal-ai Docker container — that container
has no `claude` CLI and no claude.ai session. Any code path under `web/`
(FastAPI) that calls this module — directly or via an extractor like
`halacha_extractor`, `claims_extractor`, `precedent_metadata_extractor`,
`block_writer`, `qa_validator`, `learning_loop`, `local_classifier`,
`appraiser_facts_extractor`, `brainstorm`, `style_analyzer` — is wrong.
LLM-dependent operations must be exposed as MCP tools and triggered from
agents (or the chair via Claude Code), where this module runs locally with
CLI access.
Async history: originally synchronous (``subprocess.run``) with a 120 s
timeout. That broke for large legal documents — sync subprocess stalled the
asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts
(case 8174-24 hit three timeouts in a row). Fixed by going async with a
30-minute ceiling.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from legal_mcp.config import parse_llm_json
logger = logging.getLogger(__name__)
# Default ceiling for any single ``claude -p`` invocation, in seconds.
# 30 min covers any single-document call we make in practice (chunking
# handles the rest); the bound exists only to prevent runaway zombies.
DEFAULT_TIMEOUT = 1800
LONG_TIMEOUT = 3600 # opus block writing on full case context
# #85 — two complementary hardenings for the same symptom (`claude -p` failing
# with a fast non-zero exit + empty stderr on large/slow cold prompts: CEO
# write_interim_draft, learning_loop distillation):
#
# 1. CLEAN ENV (defensive): a running Claude Code session exports markers into
# child processes; a *nested* ``claude -p`` inherits them. Stripping them lets
# every nested invocation launch as a clean top-level session. Could not be
# reproduced deterministically, so it's a suspect, not a proven cause. Auth/
# config (CLAUDE_CONFIG_DIR, ANTHROPIC_*, PATH, HOME) are kept.
# 2. RETRY (the real fix): the SAME large prompt that exits 1 once succeeds on a
# plain retry — the bail is transient. Retry with linear backoff. Timeouts and
# "CLI not found" stay deterministic and are NOT retried.
# See TaskMaster legal-ai #85.
_SESSION_MARKER_PREFIXES = ("CLAUDECODE", "CLAUDE_CODE_", "CLAUDE_AGENT_")
_SESSION_MARKER_EXACT = frozenset({"AI_AGENT", "CLAUDE_EFFORT"})
MAX_RETRIES = 3
RETRY_BACKOFF_BASE = 5 # seconds; sleep = base * attempt_number
# Phrases the CLI emits as the "result" of an exit-0 run that actually hit a
# usage/rate limit (a refusal NOTICE, not a real answer). Matched only against a
# result that is NOT structured output (doesn't start with [ or {), so a genuine
# JSON extraction containing these words as content is never mis-flagged.
_LIMIT_NOTICE_MARKERS = (
"usage limit",
"rate limit",
"limit reached",
"limit will reset",
"try again later",
)
def _looks_like_limit_notice(data: dict) -> bool:
"""True if an exit-0 ``result`` is really a usage/rate-limit refusal."""
result = data.get("result")
if not isinstance(result, str):
return False
stripped = result.lstrip()
# Structured output (JSON array/object) is a real answer, never a notice.
if stripped.startswith(("[", "{")):
return False
low = result.lower()
return any(m in low for m in _LIMIT_NOTICE_MARKERS)
def _clean_subprocess_env() -> dict[str, str]:
"""Copy the current env minus Claude Code session markers.
Lets a nested ``claude -p`` start fresh instead of detecting it is
already inside a Claude Code session (#85).
"""
env = dict(os.environ)
for key in list(env):
if key in _SESSION_MARKER_EXACT or key.startswith(_SESSION_MARKER_PREFIXES):
del env[key]
return env
async def query(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
max_turns: int = 1,
*,
system: str | None = None,
model: str | None = None,
effort: str | None = None,
tools: str | None = None,
) -> str:
"""Send a prompt to Claude Code headless and return the text response.
Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit —
prompts can be 500K+ chars when analyzing a full style corpus.
Args:
prompt: The prompt to send.
timeout: Max seconds before the subprocess is killed.
max_turns: Max conversation turns (1 = single response).
system: Optional repeated-instruction text. Prepended to ``prompt``
for the CLI; we don't pass it as a separate arg because the
CLI doesn't expose API-level caching. The parameter exists so
extractors can structure their calls cleanly today, and to make
a future SDK-backed path drop-in.
model: Optional model alias/id (e.g. ``claude-opus-4-8``). When set,
passed as ``--model``; otherwise the CLI's session default is
used. Lets quality-sensitive extractors (halacha) pin a stronger
model without changing the default for every caller.
effort: Optional effort level (``low``/``medium``/``high``/``xhigh``/
``max``). When set, passed as ``--effort``. Pairs with ``model``;
an empty string is treated as "unset" (CLI default).
tools: Optional available-tools spec, passed as ``--tools``. Pass an
empty string (``""``) to disable ALL tools — for pure text→JSON
extraction the model has no reason to call a tool, and leaving
tools enabled makes it occasionally emit ``stop_reason: tool_use``
which trips ``--max-turns 1`` → ``error_max_turns`` and forces a
retry (slow). ``None`` leaves the CLI default (all tools).
Returns:
The text response from Claude.
Raises:
RuntimeError: if the CLI is unavailable (e.g., called from the
container — see module docstring), or fails, or times out.
"""
full_prompt = f"{system}\n\n{prompt}" if system else prompt
if len(full_prompt) > 150_000:
logger.warning("Large prompt: %d chars — may hit context limits", len(full_prompt))
cmd = [
"claude", "-p",
"--output-format", "json",
"--max-turns", str(max_turns),
]
if model:
cmd += ["--model", model]
if effort:
cmd += ["--effort", effort]
if tools is not None: # "" → disable all tools (no tool_use → no max-turns trip)
cmd += ["--tools", tools]
size_info = f"; prompt_len={len(full_prompt):,} chars" if len(full_prompt) > 100_000 else ""
last_err = "unknown error"
for attempt in range(1, MAX_RETRIES + 1):
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_clean_subprocess_env(),
cwd=os.path.expanduser("~"),
)
except FileNotFoundError:
# Deterministic — never retry.
raise RuntimeError(
"Claude CLI not found. This module only works when invoked "
"from the local MCP server — see the architectural rule in "
"the module docstring. If this error came from a FastAPI "
"endpoint in the container, refactor the call into an MCP "
"tool that the chair triggers from Claude Code."
)
try:
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=full_prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
# wait_for cancellation alone leaves the child running. A timeout is
# a real ceiling, not a transient blip — don't retry.
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
raise RuntimeError(f"Claude CLI timed out after {timeout}s")
if proc.returncode != 0:
# The CLI sometimes writes its diagnostic to stdout (or nowhere)
# rather than stderr (#85) — surface whichever is present.
stderr = stderr_b.decode("utf-8", errors="replace").strip()
stdout = stdout_b.decode("utf-8", errors="replace").strip()
last_err = f"exit {proc.returncode}: {(stderr or stdout or 'no output')[:500]}"
else:
stdout = stdout_b.decode("utf-8", errors="replace").strip()
if stdout:
# claude -p --output-format json returns {"type":"result","result":"..."}
try:
data = json.loads(stdout)
if isinstance(data, dict) and "result" in data:
# A usage/rate-limit hit can exit 0 with a refusal NOTICE
# as the "result" (is_error / error subtype). Returning it
# as success makes callers treat a throttled run as a real
# empty answer — e.g. the halacha extractor then checkpoints
# the chunk as done-with-0-halachot and a resume skips it
# forever (#138/#144 silent under-extraction). Treat it as a
# transient failure → retry, and raise if it persists so the
# chunk stays un-checkpointed for a real resume.
if data.get("is_error") or _looks_like_limit_notice(data):
last_err = (
f"error result (subtype={data.get('subtype')}): "
f"{str(data.get('result',''))[:200]}"
)
else:
return data["result"]
else:
return stdout
except json.JSONDecodeError:
return stdout
else:
last_err = "empty response"
# Transient failure — retry with linear backoff unless this was the last try.
if attempt < MAX_RETRIES:
logger.warning(
"claude -p attempt %d/%d failed (%s%s) — retrying in %ds",
attempt, MAX_RETRIES, last_err, size_info, RETRY_BACKOFF_BASE * attempt,
)
await asyncio.sleep(RETRY_BACKOFF_BASE * attempt)
raise RuntimeError(
f"Claude CLI failed after {MAX_RETRIES} attempts ({last_err}){size_info}"
)
async def query_json(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
*,
system: str | None = None,
model: str | None = None,
effort: str | None = None,
tools: str | None = None,
) -> dict | list | None:
"""Send a prompt and parse the response as JSON.
Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation).
``model``/``effort``/``tools`` are forwarded to :func:`query` (see its docstring).
Pure text→JSON extractors should pass ``tools=""`` to avoid ``error_max_turns``.
"""
raw = await query(prompt, timeout=timeout, system=system, model=model, effort=effort, tools=tools)
return parse_llm_json(raw)
# ── Streaming + session continuation ────────────────────────────────
async def query_streaming(
prompt: str,
*,
system: str | None = None,
resume_session_id: str | None = None,
timeout: int = LONG_TIMEOUT,
cwd: str | None = None,
):
"""Stream Claude's response as an async iterator of events.
Wraps `claude -p --output-format=stream-json` (newline-delimited JSON
objects from the CLI) and translates each line into a small, stable
shape that the chat service / SSE proxy can forward without leaking
CLI internals to the browser.
Event shapes yielded:
{"type": "session_id", "value": "<uuid>"} # first event, used for resume
{"type": "text_delta", "text": "<partial>"} # incremental assistant text
{"type": "tool_use", "name": "...", "input": {...}}
{"type": "error", "message": "..."}
{"type": "done", "text": "<full response>"}
The CLI emits a richer stream; we project to this minimal set so the
front-end can stay stable across CLI upgrades.
Args:
prompt: The user message to send.
system: Optional system instructions (used only when starting a
fresh conversation — when resume_session_id is set, the
session already carries its system prompt).
resume_session_id: Continue a prior conversation. When given,
we don't re-send the system prompt; the CLI loads the
entire conversation history from disk.
timeout: Hard ceiling on the subprocess.
cwd: Working directory for the subprocess — defaults to the
host's HOME so claude.ai credentials resolve correctly.
"""
if resume_session_id:
# When resuming, system is already baked into the on-disk session
# — sending it again would be a no-op at best and confuse the
# conversation at worst.
full_prompt = prompt
cmd = [
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
"--resume", resume_session_id,
]
else:
full_prompt = f"{system}\n\n{prompt}" if system else prompt
cmd = [
"claude", "-p",
"--output-format", "stream-json",
"--verbose",
]
if len(full_prompt) > 200_000:
logger.warning(
"Streaming: large prompt (%d chars) — may hit CLI input limits",
len(full_prompt),
)
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=_clean_subprocess_env(),
)
except FileNotFoundError:
yield {
"type": "error",
"message": (
"Claude CLI not found on host — legal-chat-service must "
"run where the `claude` binary is installed (Daphna's host, "
"not the legal-ai container)."
),
}
return
assert proc.stdin is not None # for type checkers
assert proc.stdout is not None
# Send the prompt and close stdin so the CLI knows the user message
# is complete.
try:
proc.stdin.write(full_prompt.encode("utf-8"))
await proc.stdin.drain()
proc.stdin.close()
except BrokenPipeError:
# CLI exited before reading the prompt — drain stderr and bail.
stderr_b = await proc.stderr.read() if proc.stderr else b""
yield {
"type": "error",
"message": f"Claude CLI closed stdin early: {stderr_b.decode('utf-8', errors='replace')[:300]}",
}
return
accumulated_text: list[str] = []
session_id_emitted = False
deadline = asyncio.get_event_loop().time() + timeout
try:
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
yield {"type": "error", "message": f"timed out after {timeout}s"}
break
try:
line_b = await asyncio.wait_for(proc.stdout.readline(), timeout=remaining)
except asyncio.TimeoutError:
yield {"type": "error", "message": f"stream timed out after {timeout}s"}
break
if not line_b:
break
line = line_b.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
# Stray non-JSON line from CLI — surface a snippet for debug.
logger.debug("non-JSON stream line: %s", line[:120])
continue
# The CLI's stream-json emits several event types. We only
# care about the ones the chat service forwards.
t = event.get("type")
if not session_id_emitted:
sid = event.get("session_id")
if sid:
session_id_emitted = True
yield {"type": "session_id", "value": sid}
if t == "assistant":
# event["message"]["content"] is a list of blocks; we extract
# text blocks and tool_use blocks.
msg = event.get("message") or {}
for block in msg.get("content") or []:
btype = block.get("type")
if btype == "text":
text = block.get("text") or ""
if text:
accumulated_text.append(text)
yield {"type": "text_delta", "text": text}
elif btype == "tool_use":
yield {
"type": "tool_use",
"name": block.get("name") or "",
"input": block.get("input") or {},
}
elif t == "result":
# Final synthesized result line from the CLI — we already
# delivered the deltas, so just stop here.
break
finally:
if proc.returncode is None:
try:
proc.kill()
except ProcessLookupError:
pass
try:
await proc.wait()
except Exception:
pass
yield {"type": "done", "text": "".join(accumulated_text)}