ספ חדש לשכבת citator פנימית — תיקוף הלכות לפי טיפול-שיפוטי מצטבר (ציטוטים נכנסים), לצמצום היקף האישור-הידני של היו"ר: - docs/spec/X11-citation-corroboration.md — 6 invariants (INV-COR1–COR6), כל אחד עם ≥3 מקורות מקצועיים (Shepard's/KeyCite, Hellyer LLJ 2018, UNC Law, NCSC/JTC, CEPEJ). - docs/spec/00-constitution.md — תיקון מבוקר ל-INV-G10: השער מסופק ע"י טיפול-שיפוטי-מצטבר לתת-הקבוצה החיובית, שער-היו"ר נשאר חובה לזנב ולשלילי. + X11 באינדקס. - Opus 4.8 @ xhigh כמודל חילוץ הלכות (config HALACHA_EXTRACT_MODEL/EFFORT, env-tunable; claude_session model/effort params; halacha_extractor מחווט). מבוסס A/B 2026-05-31: פחות חילוץ-יתר, 100% quote-verified, ביטחון מכויל. - scripts/ab_halacha_opus48.py — harness A/B לא-הרסני להשוואת מודל/effort בחילוץ הלכות. - .taskmaster #70 (FU-2c-b) — תיעוד dedup שפר + סריקת-קורפוס (0 stubs תקועים נותרו). תנאי-קדם (זהות נקייה) הושלם: שפר מוזג לרשומה קנונית + סריקת 128 רשומות. audit-findings גלויים ב-X11 §7: קישור הלכה↔ציטוט + סיווג-טיפול = greenfield, ל-implementation plan. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
333 lines
13 KiB
Python
333 lines
13 KiB
Python
"""Claude Code session bridge — runs prompts via the local `claude` CLI.
|
|
|
|
All LLM calls in legal-ai go through this module. We shell out to the local
|
|
Claude Code CLI which uses the developer's claude.ai session — zero direct
|
|
API cost.
|
|
|
|
**Architectural rule (do not violate):** this module only works when invoked
|
|
from the local MCP server (the Python process at
|
|
`/home/chaim/legal-ai/mcp-server/`, launched per `~/.claude.json`). It will
|
|
**not** work when called from the legal-ai Docker container — that container
|
|
has no `claude` CLI and no claude.ai session. Any code path under `web/`
|
|
(FastAPI) that calls this module — directly or via an extractor like
|
|
`halacha_extractor`, `claims_extractor`, `precedent_metadata_extractor`,
|
|
`block_writer`, `qa_validator`, `learning_loop`, `local_classifier`,
|
|
`appraiser_facts_extractor`, `brainstorm`, `style_analyzer` — is wrong.
|
|
LLM-dependent operations must be exposed as MCP tools and triggered from
|
|
agents (or the chair via Claude Code), where this module runs locally with
|
|
CLI access.
|
|
|
|
Async history: originally synchronous (``subprocess.run``) with a 120 s
|
|
timeout. That broke for large legal documents — sync subprocess stalled the
|
|
asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts
|
|
(case 8174-24 hit three timeouts in a row). Fixed by going async with a
|
|
30-minute ceiling.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
from legal_mcp.config import parse_llm_json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default ceiling for any single ``claude -p`` invocation, in seconds.
|
|
# 30 min covers any single-document call we make in practice (chunking
|
|
# handles the rest); the bound exists only to prevent runaway zombies.
|
|
DEFAULT_TIMEOUT = 1800
|
|
LONG_TIMEOUT = 3600 # opus block writing on full case context
|
|
|
|
|
|
async def query(
|
|
prompt: str,
|
|
timeout: int = DEFAULT_TIMEOUT,
|
|
max_turns: int = 1,
|
|
*,
|
|
system: str | None = None,
|
|
model: str | None = None,
|
|
effort: str | None = None,
|
|
) -> str:
|
|
"""Send a prompt to Claude Code headless and return the text response.
|
|
|
|
Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit —
|
|
prompts can be 500K+ chars when analyzing a full style corpus.
|
|
|
|
Args:
|
|
prompt: The prompt to send.
|
|
timeout: Max seconds before the subprocess is killed.
|
|
max_turns: Max conversation turns (1 = single response).
|
|
system: Optional repeated-instruction text. Prepended to ``prompt``
|
|
for the CLI; we don't pass it as a separate arg because the
|
|
CLI doesn't expose API-level caching. The parameter exists so
|
|
extractors can structure their calls cleanly today, and to make
|
|
a future SDK-backed path drop-in.
|
|
model: Optional model alias/id (e.g. ``claude-opus-4-8``). When set,
|
|
passed as ``--model``; otherwise the CLI's session default is
|
|
used. Lets quality-sensitive extractors (halacha) pin a stronger
|
|
model without changing the default for every caller.
|
|
effort: Optional effort level (``low``/``medium``/``high``/``xhigh``/
|
|
``max``). When set, passed as ``--effort``. Pairs with ``model``;
|
|
an empty string is treated as "unset" (CLI default).
|
|
|
|
Returns:
|
|
The text response from Claude.
|
|
|
|
Raises:
|
|
RuntimeError: if the CLI is unavailable (e.g., called from the
|
|
container — see module docstring), or fails, or times out.
|
|
"""
|
|
full_prompt = f"{system}\n\n{prompt}" if system else prompt
|
|
|
|
if len(full_prompt) > 150_000:
|
|
logger.warning("Large prompt: %d chars — may hit context limits", len(full_prompt))
|
|
|
|
cmd = [
|
|
"claude", "-p",
|
|
"--output-format", "json",
|
|
"--max-turns", str(max_turns),
|
|
]
|
|
if model:
|
|
cmd += ["--model", model]
|
|
if effort:
|
|
cmd += ["--effort", effort]
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
except FileNotFoundError:
|
|
raise RuntimeError(
|
|
"Claude CLI not found. This module only works when invoked "
|
|
"from the local MCP server — see the architectural rule in "
|
|
"the module docstring. If this error came from a FastAPI "
|
|
"endpoint in the container, refactor the call into an MCP "
|
|
"tool that the chair triggers from Claude Code."
|
|
)
|
|
|
|
try:
|
|
stdout_b, stderr_b = await asyncio.wait_for(
|
|
proc.communicate(input=full_prompt.encode("utf-8")),
|
|
timeout=timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
# wait_for cancellation alone leaves the child running.
|
|
try:
|
|
proc.kill()
|
|
await proc.wait()
|
|
except ProcessLookupError:
|
|
pass
|
|
raise RuntimeError(f"Claude CLI timed out after {timeout}s")
|
|
|
|
if proc.returncode != 0:
|
|
stderr = stderr_b.decode("utf-8", errors="replace").strip()[:500] or "unknown error"
|
|
size_info = f"; prompt_len={len(full_prompt):,} chars" if len(full_prompt) > 100_000 else ""
|
|
raise RuntimeError(f"Claude CLI failed (exit {proc.returncode}): {stderr}{size_info}")
|
|
|
|
stdout = stdout_b.decode("utf-8", errors="replace").strip()
|
|
if not stdout:
|
|
raise RuntimeError("Claude CLI returned empty response")
|
|
|
|
# claude -p --output-format json returns {"type":"result","result":"..."}
|
|
try:
|
|
data = json.loads(stdout)
|
|
if isinstance(data, dict) and "result" in data:
|
|
return data["result"]
|
|
return stdout
|
|
except json.JSONDecodeError:
|
|
return stdout
|
|
|
|
|
|
async def query_json(
|
|
prompt: str,
|
|
timeout: int = DEFAULT_TIMEOUT,
|
|
*,
|
|
system: str | None = None,
|
|
model: str | None = None,
|
|
effort: str | None = None,
|
|
) -> dict | list | None:
|
|
"""Send a prompt and parse the response as JSON.
|
|
|
|
Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation).
|
|
``model``/``effort`` are forwarded to :func:`query` (see its docstring).
|
|
"""
|
|
raw = await query(prompt, timeout=timeout, system=system, model=model, effort=effort)
|
|
return parse_llm_json(raw)
|
|
|
|
|
|
# ── Streaming + session continuation ────────────────────────────────
|
|
|
|
|
|
async def query_streaming(
|
|
prompt: str,
|
|
*,
|
|
system: str | None = None,
|
|
resume_session_id: str | None = None,
|
|
timeout: int = LONG_TIMEOUT,
|
|
cwd: str | None = None,
|
|
):
|
|
"""Stream Claude's response as an async iterator of events.
|
|
|
|
Wraps `claude -p --output-format=stream-json` (newline-delimited JSON
|
|
objects from the CLI) and translates each line into a small, stable
|
|
shape that the chat service / SSE proxy can forward without leaking
|
|
CLI internals to the browser.
|
|
|
|
Event shapes yielded:
|
|
{"type": "session_id", "value": "<uuid>"} # first event, used for resume
|
|
{"type": "text_delta", "text": "<partial>"} # incremental assistant text
|
|
{"type": "tool_use", "name": "...", "input": {...}}
|
|
{"type": "error", "message": "..."}
|
|
{"type": "done", "text": "<full response>"}
|
|
|
|
The CLI emits a richer stream; we project to this minimal set so the
|
|
front-end can stay stable across CLI upgrades.
|
|
|
|
Args:
|
|
prompt: The user message to send.
|
|
system: Optional system instructions (used only when starting a
|
|
fresh conversation — when resume_session_id is set, the
|
|
session already carries its system prompt).
|
|
resume_session_id: Continue a prior conversation. When given,
|
|
we don't re-send the system prompt; the CLI loads the
|
|
entire conversation history from disk.
|
|
timeout: Hard ceiling on the subprocess.
|
|
cwd: Working directory for the subprocess — defaults to the
|
|
host's HOME so claude.ai credentials resolve correctly.
|
|
"""
|
|
if resume_session_id:
|
|
# When resuming, system is already baked into the on-disk session
|
|
# — sending it again would be a no-op at best and confuse the
|
|
# conversation at worst.
|
|
full_prompt = prompt
|
|
cmd = [
|
|
"claude", "-p",
|
|
"--output-format", "stream-json",
|
|
"--verbose",
|
|
"--resume", resume_session_id,
|
|
]
|
|
else:
|
|
full_prompt = f"{system}\n\n{prompt}" if system else prompt
|
|
cmd = [
|
|
"claude", "-p",
|
|
"--output-format", "stream-json",
|
|
"--verbose",
|
|
]
|
|
|
|
if len(full_prompt) > 200_000:
|
|
logger.warning(
|
|
"Streaming: large prompt (%d chars) — may hit CLI input limits",
|
|
len(full_prompt),
|
|
)
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=cwd,
|
|
)
|
|
except FileNotFoundError:
|
|
yield {
|
|
"type": "error",
|
|
"message": (
|
|
"Claude CLI not found on host — legal-chat-service must "
|
|
"run where the `claude` binary is installed (Daphna's host, "
|
|
"not the legal-ai container)."
|
|
),
|
|
}
|
|
return
|
|
|
|
assert proc.stdin is not None # for type checkers
|
|
assert proc.stdout is not None
|
|
|
|
# Send the prompt and close stdin so the CLI knows the user message
|
|
# is complete.
|
|
try:
|
|
proc.stdin.write(full_prompt.encode("utf-8"))
|
|
await proc.stdin.drain()
|
|
proc.stdin.close()
|
|
except BrokenPipeError:
|
|
# CLI exited before reading the prompt — drain stderr and bail.
|
|
stderr_b = await proc.stderr.read() if proc.stderr else b""
|
|
yield {
|
|
"type": "error",
|
|
"message": f"Claude CLI closed stdin early: {stderr_b.decode('utf-8', errors='replace')[:300]}",
|
|
}
|
|
return
|
|
|
|
accumulated_text: list[str] = []
|
|
session_id_emitted = False
|
|
deadline = asyncio.get_event_loop().time() + timeout
|
|
try:
|
|
while True:
|
|
remaining = deadline - asyncio.get_event_loop().time()
|
|
if remaining <= 0:
|
|
yield {"type": "error", "message": f"timed out after {timeout}s"}
|
|
break
|
|
try:
|
|
line_b = await asyncio.wait_for(proc.stdout.readline(), timeout=remaining)
|
|
except asyncio.TimeoutError:
|
|
yield {"type": "error", "message": f"stream timed out after {timeout}s"}
|
|
break
|
|
if not line_b:
|
|
break
|
|
line = line_b.decode("utf-8", errors="replace").strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
event = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
# Stray non-JSON line from CLI — surface a snippet for debug.
|
|
logger.debug("non-JSON stream line: %s", line[:120])
|
|
continue
|
|
|
|
# The CLI's stream-json emits several event types. We only
|
|
# care about the ones the chat service forwards.
|
|
t = event.get("type")
|
|
if not session_id_emitted:
|
|
sid = event.get("session_id")
|
|
if sid:
|
|
session_id_emitted = True
|
|
yield {"type": "session_id", "value": sid}
|
|
|
|
if t == "assistant":
|
|
# event["message"]["content"] is a list of blocks; we extract
|
|
# text blocks and tool_use blocks.
|
|
msg = event.get("message") or {}
|
|
for block in msg.get("content") or []:
|
|
btype = block.get("type")
|
|
if btype == "text":
|
|
text = block.get("text") or ""
|
|
if text:
|
|
accumulated_text.append(text)
|
|
yield {"type": "text_delta", "text": text}
|
|
elif btype == "tool_use":
|
|
yield {
|
|
"type": "tool_use",
|
|
"name": block.get("name") or "",
|
|
"input": block.get("input") or {},
|
|
}
|
|
elif t == "result":
|
|
# Final synthesized result line from the CLI — we already
|
|
# delivered the deltas, so just stop here.
|
|
break
|
|
finally:
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
await proc.wait()
|
|
except Exception:
|
|
pass
|
|
|
|
yield {"type": "done", "text": "".join(accumulated_text)}
|