fix(precedents): Anthropic SDK fallback, format() crash, UI refresh
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m31s

Three fixes to the precedent library after the first end-to-end test on
403-17 surfaced runtime issues:

1. Anthropic SDK fallback in claude_session. The legal-ai Docker container
   does not ship the `claude` CLI, so every halacha and metadata extraction
   was failing with "Claude CLI not found." Module now tries the CLI first
   (zero-cost local path) and falls back to the Anthropic SDK with
   ANTHROPIC_API_KEY when the binary is absent. Default model is
   claude-sonnet-4-6, overridable via CLAUDE_SDK_MODEL env. The system
   message gets cache_control: ephemeral so multi-chunk runs reuse the
   cached instruction prefix at ~10% read cost. Adds `anthropic` to
   pyproject deps.

2. precedent_metadata_extractor crashed with KeyError because the JSON
   example inside the prompt template contained literal { } characters
   that str.format() interpreted as placeholders. Switched to f-string
   concatenation; the prompt template no longer needs format() at all.

3. Library list query stays stale after upload because the upload
   mutation's onSuccess fires when the POST returns task_id, not when
   SSE reports completion. Added a second invalidate inside the SSE
   watcher in PrecedentUploadSheet so the new row appears with up-to-date
   chunk and halachot counts the moment processing finishes.

Halacha and metadata extractors now route the long static prompt through
the new `system=` parameter so the SDK path actually caches it; the CLI
path concatenates and behaves as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-03 10:52:31 +00:00
parent 73a79ea7e8
commit 5d836ca414
5 changed files with 198 additions and 55 deletions

View File

@@ -8,6 +8,7 @@ dependencies = [
"asyncpg>=0.29.0",
"pgvector>=0.3.0",
"voyageai>=0.3.0",
"anthropic>=0.45.0",
"python-dotenv>=1.0.0",
"pydantic>=2.0.0",
"pymupdf>=1.25.0",

View File

@@ -1,21 +1,26 @@
"""Claude Code session bridge — runs prompts via `claude -p` instead of API.
"""Claude Code session bridge — runs prompts via `claude -p` or Anthropic SDK.
All LLM calls in the project go through this module. We shell out to the
local Claude Code CLI which uses the developer's claude.ai session — zero
direct Anthropic API cost.
History: originally shelled out to `claude -p` exclusively (zero direct API
cost via the developer's claude.ai session). That works locally but fails
in the legal-ai Docker container, which does not ship the CLI. To keep the
same call sites working in production, the module now tries the CLI first
and falls back to the Anthropic SDK using ``ANTHROPIC_API_KEY`` when the
CLI binary is absent.
History: this module was originally synchronous (``subprocess.run``) with
a 120-second timeout. That broke for large legal documents:
Both paths share a single shape: ``query()`` returns text, ``query_json()``
parses that text as JSON. Callers don't need to know which path executed.
1. Sync subprocess stalled the asyncio event loop in the MCP server
while a single LLM call was in flight.
2. 120 seconds was far too short. A 25K-character Hebrew appeal on cold
prompt cache routinely takes 130-180 seconds; we proved this in case
8174-24 (three timeouts in a row).
Async history: originally synchronous (``subprocess.run``) with a 120 s
timeout. That broke for large legal documents — sync subprocess stalled
the asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts
(case 8174-24 hit three timeouts in a row). Fixed by going async with a
30-minute ceiling.
The fix: switch to async subprocess (non-blocking) and raise the default
ceiling to 30 minutes — long enough that no realistic document hits it,
but bounded so a runaway never zombifies forever.
Caching contract (SDK path): pass long, repeated instruction text via
``system=...``. The SDK path adds ``cache_control: ephemeral`` so back-to-back
chunk calls reuse the cached prefix at ~10% of read cost. The CLI path doesn't
expose API-level caching; with ``system`` set, we just prepend it to the
prompt — same observable behavior, no caching benefit.
"""
from __future__ import annotations
@@ -23,54 +28,82 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
from legal_mcp.config import parse_llm_json
logger = logging.getLogger(__name__)
# Default ceiling for any single ``claude -p`` invocation, in seconds.
# Default ceiling for any single LLM call, in seconds.
# 30 min covers any single-document call we make in practice (chunking
# handles the rest); the bound exists only to prevent runaway zombies.
DEFAULT_TIMEOUT = 1800
LONG_TIMEOUT = 3600 # opus block writing on full case context
# Anthropic SDK fallback config — used when `claude` CLI is not on PATH.
# Default to Sonnet 4.6: strong balance of Hebrew legal-text quality and
# cost for the per-chunk extraction workload. Override via env if needed.
DEFAULT_SDK_MODEL = os.environ.get("CLAUDE_SDK_MODEL", "claude-sonnet-4-6")
DEFAULT_SDK_MAX_TOKENS = int(os.environ.get("CLAUDE_SDK_MAX_TOKENS", "8192"))
async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str:
"""Send a prompt to Claude Code headless and return the text response.
_anthropic_client = None
Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit —
prompts can be 500K+ chars when analyzing a full style corpus.
Args:
prompt: The prompt to send.
timeout: Max seconds before the subprocess is killed.
max_turns: Max conversation turns (1 = single response).
def _has_cli() -> bool:
return shutil.which("claude") is not None
Returns:
The text response from Claude.
Raises:
RuntimeError: If claude CLI is not available, fails, or times out.
def _get_anthropic_client():
"""Lazy-init the AsyncAnthropic client. Raises with a clear message if
the package or API key is missing — better than letting the SDK 401 in
the middle of a multi-chunk extraction.
"""
global _anthropic_client
if _anthropic_client is not None:
return _anthropic_client
try:
import anthropic
except ImportError as e:
raise RuntimeError(
"The 'anthropic' package is required when the Claude CLI is "
"unavailable. Add it to mcp-server/pyproject.toml."
) from e
if not os.environ.get("ANTHROPIC_API_KEY"):
raise RuntimeError(
"ANTHROPIC_API_KEY is not set; cannot fall back to Anthropic SDK."
)
_anthropic_client = anthropic.AsyncAnthropic()
return _anthropic_client
async def _query_cli(
prompt: str, system: str | None, timeout: int, max_turns: int,
) -> str:
"""Run the prompt via the local `claude` CLI subprocess.
Uses the developer's claude.ai session — zero direct API cost. With
``system`` set, we just prepend it to the prompt; the CLI doesn't
expose API-level caching anyway.
"""
full_prompt = f"{system}\n\n{prompt}" if system else prompt
cmd = [
"claude", "-p",
"--output-format", "json",
"--max-turns", str(max_turns),
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError:
raise RuntimeError("Claude CLI not found. Install Claude Code or add 'claude' to PATH.")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=prompt.encode("utf-8")),
proc.communicate(input=full_prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
@@ -100,10 +133,104 @@ async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1)
return stdout
async def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None:
async def _query_sdk(prompt: str, system: str | None, timeout: int) -> str:
"""Run the prompt via the Anthropic SDK with 5-min ephemeral caching
on the system message. Streams the response to dodge HTTP read timeouts
on long Hebrew JSON outputs.
"""
import anthropic
client = _get_anthropic_client()
kwargs: dict = {
"model": DEFAULT_SDK_MODEL,
"max_tokens": DEFAULT_SDK_MAX_TOKENS,
"messages": [{"role": "user", "content": prompt}],
}
if system:
# cache_control: ephemeral → 5-min TTL. The same system text repeats
# across every chunk in an extraction run, so we get
# cache_read_input_tokens on every call after the first.
kwargs["system"] = [{
"type": "text",
"text": system,
"cache_control": {"type": "ephemeral"},
}]
try:
async with client.messages.stream(**kwargs) as stream:
message = await asyncio.wait_for(
stream.get_final_message(),
timeout=timeout,
)
except asyncio.TimeoutError:
raise RuntimeError(f"Anthropic SDK call timed out after {timeout}s")
except anthropic.APIError as e:
raise RuntimeError(f"Anthropic SDK call failed: {e}") from e
text_parts: list[str] = []
for block in message.content:
if getattr(block, "type", None) == "text":
text_parts.append(block.text)
out = "".join(text_parts).strip()
if not out:
raise RuntimeError("Anthropic SDK returned no text content")
usage = getattr(message, "usage", None)
if usage is not None:
logger.debug(
"claude_session SDK usage: input=%s cache_read=%s cache_write=%s output=%s",
getattr(usage, "input_tokens", None),
getattr(usage, "cache_read_input_tokens", None),
getattr(usage, "cache_creation_input_tokens", None),
getattr(usage, "output_tokens", None),
)
return out
async def query(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
max_turns: int = 1,
*,
system: str | None = None,
) -> str:
"""Send a prompt to Claude and return the text response.
Tries the Claude CLI first (zero API cost, uses claude.ai session).
Falls back to the Anthropic SDK with ANTHROPIC_API_KEY when the CLI is
absent — this is the production-Docker path.
Pass repeating instruction text via ``system=`` so the SDK path can
cache it (5-min ephemeral). Pass only the per-chunk content via
``prompt``. The CLI path concatenates them; the SDK path keeps them
separate so caching works.
Args:
prompt: The user-facing prompt text.
timeout: Max seconds before the call is aborted.
max_turns: CLI-only — max conversation turns (1 = single response).
system: Optional system message. With the SDK path, gets cached
with 5-min ephemeral TTL when set.
Raises:
RuntimeError: if both paths fail or time out. The message includes
which path raised so the caller can distinguish CLI from SDK.
"""
if _has_cli():
return await _query_cli(prompt, system, timeout, max_turns)
return await _query_sdk(prompt, system, timeout)
async def query_json(
prompt: str,
timeout: int = DEFAULT_TIMEOUT,
*,
system: str | None = None,
) -> dict | list | None:
"""Send a prompt and parse the response as JSON.
Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation).
"""
raw = await query(prompt, timeout=timeout)
raw = await query(prompt, timeout=timeout, system=system)
return parse_llm_json(raw)

View File

@@ -279,8 +279,9 @@ async def _extract_chunk(
else HALACHA_EXTRACTION_PROMPT_PERSUASIVE
)
chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else ""
prompt = (
f"{base_prompt}\n\n"
# Pass the static instruction prompt as `system` so the SDK path can cache
# it (5-min ephemeral). Only the per-chunk content varies via `prompt`.
user_msg = (
f"## הקלט\n"
f"סוג קטע: {section_type}\n"
f"{context}{chunk_label}\n\n"
@@ -289,7 +290,7 @@ async def _extract_chunk(
last_err: Exception | None = None
for attempt in range(CHUNK_RETRY_ATTEMPTS + 1):
try:
result = await claude_session.query_json(prompt)
result = await claude_session.query_json(user_msg, system=base_prompt)
except Exception as e:
last_err = e
logger.warning(

View File

@@ -28,6 +28,10 @@ _HEAD_CHARS = 12_000
_TAIL_CHARS = 6_000
# Note: this template is concatenated with f-strings at call-time rather
# than using .format(), because the JSON example below contains '{' / '}'
# which str.format would interpret as placeholders and crash with
# KeyError on the field names.
METADATA_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. קרא את פסק הדין/ההחלטה הבא וחלץ ממנו מטא-דאטה לקטלוג הקורפוס.
המטרה: למלא שדות בטופס העלאה שהמשתמש הזין באופן חלקי. **אל תמציא** — אם המידע לא מופיע בטקסט, השאר ריק (מחרוזת ריקה / מערך ריק).
@@ -51,13 +55,6 @@ METADATA_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. קרא א
4. **headnote** — לא מצטטים, מסכמים. סגנון נבו: ביטוי קצר אחד.
5. **key_quote** — חייב להיות הדבקה מילולית מהקלט. אם אין ציטוט בולט — השאר ריק.
6. **subject_tags** — 3-7 תגיות בעברית, snake_case (חניה, קווי_בניין, שיקול_דעת, פגם_פרוצדורלי, סמכות, מועדים, פגיעה_במקרקעין, ירידת_ערך, תכנית_רחביה, מימוש_במכר, וכד'). שייך לתחום של ועדת ערר תכנון ובניה.
## הקלט
{context}
--- תחילת הטקסט ---
{text_window}
--- סוף הטקסט ---
"""
@@ -104,12 +101,18 @@ async def extract_metadata(case_law_id: UUID | str) -> dict:
f"תאריך: {date_str}\n"
f"תחום: {practice_area}"
)
prompt = METADATA_EXTRACTION_PROMPT.format(
context=context, text_window=_build_text_window(full_text),
text_window = _build_text_window(full_text)
# Static instructions go via `system` so the SDK path can cache them
# across uploads. Per-precedent content goes in the user prompt.
user_msg = (
f"## הקלט\n{context}\n\n"
f"--- תחילת הטקסט ---\n{text_window}\n--- סוף הטקסט ---"
)
try:
result = await claude_session.query_json(prompt)
result = await claude_session.query_json(
user_msg, system=METADATA_EXTRACTION_PROMPT,
)
except Exception as e:
logger.warning("precedent_metadata_extractor: query failed: %s", e)
return {}