From 5d836ca414aec8a943896b293a04d702f6f5f3a1 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sun, 3 May 2026 10:52:31 +0000 Subject: [PATCH] fix(precedents): Anthropic SDK fallback, format() crash, UI refresh Three fixes to the precedent library after the first end-to-end test on 403-17 surfaced runtime issues: 1. Anthropic SDK fallback in claude_session. The legal-ai Docker container does not ship the `claude` CLI, so every halacha and metadata extraction was failing with "Claude CLI not found." Module now tries the CLI first (zero-cost local path) and falls back to the Anthropic SDK with ANTHROPIC_API_KEY when the binary is absent. Default model is claude-sonnet-4-6, overridable via CLAUDE_SDK_MODEL env. The system message gets cache_control: ephemeral so multi-chunk runs reuse the cached instruction prefix at ~10% read cost. Adds `anthropic` to pyproject deps. 2. precedent_metadata_extractor crashed with KeyError because the JSON example inside the prompt template contained literal { } characters that str.format() interpreted as placeholders. Switched to f-string concatenation; the prompt template no longer needs format() at all. 3. Library list query stays stale after upload because the upload mutation's onSuccess fires when the POST returns task_id, not when SSE reports completion. Added a second invalidate inside the SSE watcher in PrecedentUploadSheet so the new row appears with up-to-date chunk and halachot counts the moment processing finishes. Halacha and metadata extractors now route the long static prompt through the new `system=` parameter so the SDK path actually caches it; the CLI path concatenates and behaves as before. Co-Authored-By: Claude Opus 4.7 (1M context) --- mcp-server/pyproject.toml | 1 + .../src/legal_mcp/services/claude_session.py | 205 ++++++++++++++---- .../legal_mcp/services/halacha_extractor.py | 7 +- .../services/precedent_metadata_extractor.py | 23 +- .../precedents/precedent-upload-sheet.tsx | 17 +- 5 files changed, 198 insertions(+), 55 deletions(-) diff --git a/mcp-server/pyproject.toml b/mcp-server/pyproject.toml index 2829d57..f60d38a 100644 --- a/mcp-server/pyproject.toml +++ b/mcp-server/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "asyncpg>=0.29.0", "pgvector>=0.3.0", "voyageai>=0.3.0", + "anthropic>=0.45.0", "python-dotenv>=1.0.0", "pydantic>=2.0.0", "pymupdf>=1.25.0", diff --git a/mcp-server/src/legal_mcp/services/claude_session.py b/mcp-server/src/legal_mcp/services/claude_session.py index b1014a7..757b76c 100644 --- a/mcp-server/src/legal_mcp/services/claude_session.py +++ b/mcp-server/src/legal_mcp/services/claude_session.py @@ -1,21 +1,26 @@ -"""Claude Code session bridge — runs prompts via `claude -p` instead of API. +"""Claude Code session bridge — runs prompts via `claude -p` or Anthropic SDK. -All LLM calls in the project go through this module. We shell out to the -local Claude Code CLI which uses the developer's claude.ai session — zero -direct Anthropic API cost. +History: originally shelled out to `claude -p` exclusively (zero direct API +cost via the developer's claude.ai session). That works locally but fails +in the legal-ai Docker container, which does not ship the CLI. To keep the +same call sites working in production, the module now tries the CLI first +and falls back to the Anthropic SDK using ``ANTHROPIC_API_KEY`` when the +CLI binary is absent. -History: this module was originally synchronous (``subprocess.run``) with -a 120-second timeout. That broke for large legal documents: +Both paths share a single shape: ``query()`` returns text, ``query_json()`` +parses that text as JSON. Callers don't need to know which path executed. - 1. Sync subprocess stalled the asyncio event loop in the MCP server - while a single LLM call was in flight. - 2. 120 seconds was far too short. A 25K-character Hebrew appeal on cold - prompt cache routinely takes 130-180 seconds; we proved this in case - 8174-24 (three timeouts in a row). +Async history: originally synchronous (``subprocess.run``) with a 120 s +timeout. That broke for large legal documents — sync subprocess stalled +the asyncio loop, and 120 s was far too short for cold-cache Hebrew prompts +(case 8174-24 hit three timeouts in a row). Fixed by going async with a +30-minute ceiling. -The fix: switch to async subprocess (non-blocking) and raise the default -ceiling to 30 minutes — long enough that no realistic document hits it, -but bounded so a runaway never zombifies forever. +Caching contract (SDK path): pass long, repeated instruction text via +``system=...``. The SDK path adds ``cache_control: ephemeral`` so back-to-back +chunk calls reuse the cached prefix at ~10% of read cost. The CLI path doesn't +expose API-level caching; with ``system`` set, we just prepend it to the +prompt — same observable behavior, no caching benefit. """ from __future__ import annotations @@ -23,54 +28,82 @@ from __future__ import annotations import asyncio import json import logging +import os +import shutil from legal_mcp.config import parse_llm_json logger = logging.getLogger(__name__) -# Default ceiling for any single ``claude -p`` invocation, in seconds. +# Default ceiling for any single LLM call, in seconds. # 30 min covers any single-document call we make in practice (chunking # handles the rest); the bound exists only to prevent runaway zombies. DEFAULT_TIMEOUT = 1800 LONG_TIMEOUT = 3600 # opus block writing on full case context +# Anthropic SDK fallback config — used when `claude` CLI is not on PATH. +# Default to Sonnet 4.6: strong balance of Hebrew legal-text quality and +# cost for the per-chunk extraction workload. Override via env if needed. +DEFAULT_SDK_MODEL = os.environ.get("CLAUDE_SDK_MODEL", "claude-sonnet-4-6") +DEFAULT_SDK_MAX_TOKENS = int(os.environ.get("CLAUDE_SDK_MAX_TOKENS", "8192")) -async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str: - """Send a prompt to Claude Code headless and return the text response. +_anthropic_client = None - Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit — - prompts can be 500K+ chars when analyzing a full style corpus. - Args: - prompt: The prompt to send. - timeout: Max seconds before the subprocess is killed. - max_turns: Max conversation turns (1 = single response). +def _has_cli() -> bool: + return shutil.which("claude") is not None - Returns: - The text response from Claude. - Raises: - RuntimeError: If claude CLI is not available, fails, or times out. +def _get_anthropic_client(): + """Lazy-init the AsyncAnthropic client. Raises with a clear message if + the package or API key is missing — better than letting the SDK 401 in + the middle of a multi-chunk extraction. """ + global _anthropic_client + if _anthropic_client is not None: + return _anthropic_client + try: + import anthropic + except ImportError as e: + raise RuntimeError( + "The 'anthropic' package is required when the Claude CLI is " + "unavailable. Add it to mcp-server/pyproject.toml." + ) from e + if not os.environ.get("ANTHROPIC_API_KEY"): + raise RuntimeError( + "ANTHROPIC_API_KEY is not set; cannot fall back to Anthropic SDK." + ) + _anthropic_client = anthropic.AsyncAnthropic() + return _anthropic_client + + +async def _query_cli( + prompt: str, system: str | None, timeout: int, max_turns: int, +) -> str: + """Run the prompt via the local `claude` CLI subprocess. + + Uses the developer's claude.ai session — zero direct API cost. With + ``system`` set, we just prepend it to the prompt; the CLI doesn't + expose API-level caching anyway. + """ + full_prompt = f"{system}\n\n{prompt}" if system else prompt + cmd = [ "claude", "-p", "--output-format", "json", "--max-turns", str(max_turns), ] - try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - except FileNotFoundError: - raise RuntimeError("Claude CLI not found. Install Claude Code or add 'claude' to PATH.") + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) try: stdout_b, stderr_b = await asyncio.wait_for( - proc.communicate(input=prompt.encode("utf-8")), + proc.communicate(input=full_prompt.encode("utf-8")), timeout=timeout, ) except asyncio.TimeoutError: @@ -100,10 +133,104 @@ async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) return stdout -async def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None: +async def _query_sdk(prompt: str, system: str | None, timeout: int) -> str: + """Run the prompt via the Anthropic SDK with 5-min ephemeral caching + on the system message. Streams the response to dodge HTTP read timeouts + on long Hebrew JSON outputs. + """ + import anthropic + + client = _get_anthropic_client() + + kwargs: dict = { + "model": DEFAULT_SDK_MODEL, + "max_tokens": DEFAULT_SDK_MAX_TOKENS, + "messages": [{"role": "user", "content": prompt}], + } + if system: + # cache_control: ephemeral → 5-min TTL. The same system text repeats + # across every chunk in an extraction run, so we get + # cache_read_input_tokens on every call after the first. + kwargs["system"] = [{ + "type": "text", + "text": system, + "cache_control": {"type": "ephemeral"}, + }] + + try: + async with client.messages.stream(**kwargs) as stream: + message = await asyncio.wait_for( + stream.get_final_message(), + timeout=timeout, + ) + except asyncio.TimeoutError: + raise RuntimeError(f"Anthropic SDK call timed out after {timeout}s") + except anthropic.APIError as e: + raise RuntimeError(f"Anthropic SDK call failed: {e}") from e + + text_parts: list[str] = [] + for block in message.content: + if getattr(block, "type", None) == "text": + text_parts.append(block.text) + out = "".join(text_parts).strip() + if not out: + raise RuntimeError("Anthropic SDK returned no text content") + + usage = getattr(message, "usage", None) + if usage is not None: + logger.debug( + "claude_session SDK usage: input=%s cache_read=%s cache_write=%s output=%s", + getattr(usage, "input_tokens", None), + getattr(usage, "cache_read_input_tokens", None), + getattr(usage, "cache_creation_input_tokens", None), + getattr(usage, "output_tokens", None), + ) + return out + + +async def query( + prompt: str, + timeout: int = DEFAULT_TIMEOUT, + max_turns: int = 1, + *, + system: str | None = None, +) -> str: + """Send a prompt to Claude and return the text response. + + Tries the Claude CLI first (zero API cost, uses claude.ai session). + Falls back to the Anthropic SDK with ANTHROPIC_API_KEY when the CLI is + absent — this is the production-Docker path. + + Pass repeating instruction text via ``system=`` so the SDK path can + cache it (5-min ephemeral). Pass only the per-chunk content via + ``prompt``. The CLI path concatenates them; the SDK path keeps them + separate so caching works. + + Args: + prompt: The user-facing prompt text. + timeout: Max seconds before the call is aborted. + max_turns: CLI-only — max conversation turns (1 = single response). + system: Optional system message. With the SDK path, gets cached + with 5-min ephemeral TTL when set. + + Raises: + RuntimeError: if both paths fail or time out. The message includes + which path raised so the caller can distinguish CLI from SDK. + """ + if _has_cli(): + return await _query_cli(prompt, system, timeout, max_turns) + return await _query_sdk(prompt, system, timeout) + + +async def query_json( + prompt: str, + timeout: int = DEFAULT_TIMEOUT, + *, + system: str | None = None, +) -> dict | list | None: """Send a prompt and parse the response as JSON. Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation). """ - raw = await query(prompt, timeout=timeout) + raw = await query(prompt, timeout=timeout, system=system) return parse_llm_json(raw) diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index 9917615..d3267f4 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -279,8 +279,9 @@ async def _extract_chunk( else HALACHA_EXTRACTION_PROMPT_PERSUASIVE ) chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else "" - prompt = ( - f"{base_prompt}\n\n" + # Pass the static instruction prompt as `system` so the SDK path can cache + # it (5-min ephemeral). Only the per-chunk content varies via `prompt`. + user_msg = ( f"## הקלט\n" f"סוג קטע: {section_type}\n" f"{context}{chunk_label}\n\n" @@ -289,7 +290,7 @@ async def _extract_chunk( last_err: Exception | None = None for attempt in range(CHUNK_RETRY_ATTEMPTS + 1): try: - result = await claude_session.query_json(prompt) + result = await claude_session.query_json(user_msg, system=base_prompt) except Exception as e: last_err = e logger.warning( diff --git a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py index 29f5122..be7694b 100644 --- a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py +++ b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py @@ -28,6 +28,10 @@ _HEAD_CHARS = 12_000 _TAIL_CHARS = 6_000 +# Note: this template is concatenated with f-strings at call-time rather +# than using .format(), because the JSON example below contains '{' / '}' +# which str.format would interpret as placeholders and crash with +# KeyError on the field names. METADATA_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. קרא את פסק הדין/ההחלטה הבא וחלץ ממנו מטא-דאטה לקטלוג הקורפוס. המטרה: למלא שדות בטופס העלאה שהמשתמש הזין באופן חלקי. **אל תמציא** — אם המידע לא מופיע בטקסט, השאר ריק (מחרוזת ריקה / מערך ריק). @@ -51,13 +55,6 @@ METADATA_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. קרא א 4. **headnote** — לא מצטטים, מסכמים. סגנון נבו: ביטוי קצר אחד. 5. **key_quote** — חייב להיות הדבקה מילולית מהקלט. אם אין ציטוט בולט — השאר ריק. 6. **subject_tags** — 3-7 תגיות בעברית, snake_case (חניה, קווי_בניין, שיקול_דעת, פגם_פרוצדורלי, סמכות, מועדים, פגיעה_במקרקעין, ירידת_ערך, תכנית_רחביה, מימוש_במכר, וכד'). שייך לתחום של ועדת ערר תכנון ובניה. - -## הקלט -{context} - ---- תחילת הטקסט --- -{text_window} ---- סוף הטקסט --- """ @@ -104,12 +101,18 @@ async def extract_metadata(case_law_id: UUID | str) -> dict: f"תאריך: {date_str}\n" f"תחום: {practice_area}" ) - prompt = METADATA_EXTRACTION_PROMPT.format( - context=context, text_window=_build_text_window(full_text), + text_window = _build_text_window(full_text) + # Static instructions go via `system` so the SDK path can cache them + # across uploads. Per-precedent content goes in the user prompt. + user_msg = ( + f"## הקלט\n{context}\n\n" + f"--- תחילת הטקסט ---\n{text_window}\n--- סוף הטקסט ---" ) try: - result = await claude_session.query_json(prompt) + result = await claude_session.query_json( + user_msg, system=METADATA_EXTRACTION_PROMPT, + ) except Exception as e: logger.warning("precedent_metadata_extractor: query failed: %s", e) return {} diff --git a/web-ui/src/components/precedents/precedent-upload-sheet.tsx b/web-ui/src/components/precedents/precedent-upload-sheet.tsx index 9320a6b..a00e240 100644 --- a/web-ui/src/components/precedents/precedent-upload-sheet.tsx +++ b/web-ui/src/components/precedents/precedent-upload-sheet.tsx @@ -3,6 +3,7 @@ import { useEffect, useState } from "react"; import { Upload, Loader2, CheckCircle2, AlertCircle } from "lucide-react"; import { toast } from "sonner"; +import { useQueryClient } from "@tanstack/react-query"; import { Sheet, SheetContent, SheetHeader, SheetTitle, SheetDescription, } from "@/components/ui/sheet"; @@ -14,7 +15,10 @@ import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue, } from "@/components/ui/select"; import { Progress } from "@/components/ui/progress"; -import { useUploadPrecedent, type PracticeArea, type SourceType } from "@/lib/api/precedent-library"; +import { + useUploadPrecedent, libraryKeys, + type PracticeArea, type SourceType, +} from "@/lib/api/precedent-library"; import { useProgress } from "@/lib/api/documents"; import { PRACTICE_AREAS, PRECEDENT_LEVELS, SOURCE_TYPES, @@ -44,6 +48,7 @@ export function PrecedentUploadSheet({ open, onOpenChange }: Props) { const [taskId, setTaskId] = useState(null); const upload = useUploadPrecedent(); const progress = useProgress(taskId); + const qc = useQueryClient(); // Reset form when the sheet closes — fields, file input, and any in-flight // task subscription. We accept the cascade-render warning because resetting @@ -60,17 +65,23 @@ export function PrecedentUploadSheet({ open, onOpenChange }: Props) { setHeadnote(""); setIsBinding(true); setTaskId(null); }, [open]); - // Auto-close on completion + // Auto-close on completion + refresh library list/stats so the new + // row appears with up-to-date counts (halachot, approved). The mutation's + // onSuccess fires when POST returns the task_id; we need a second + // invalidation when SSE reports terminal status, otherwise the table + // shows stale data. useEffect(() => { if (progress?.status === "completed") { + qc.invalidateQueries({ queryKey: libraryKeys.all }); toast.success("הפסיקה הוכנסה לקורפוס. ההלכות ממתינות לאישור."); const t = window.setTimeout(() => onOpenChange(false), 1200); return () => window.clearTimeout(t); } if (progress?.status === "failed") { + qc.invalidateQueries({ queryKey: libraryKeys.all }); toast.error(`כשל בעיבוד: ${progress.error || "שגיאה לא ידועה"}`); } - }, [progress, onOpenChange]); + }, [progress, onOpenChange, qc]); const onSubmit = async (e: React.FormEvent) => { e.preventDefault();