LLM session: async, 30min timeout, semantic chunking + parallel
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m28s

The claude_session bridge had two structural defects that made any
non-trivial document extraction unreliable:

  1. subprocess.run() blocks the asyncio event loop in the MCP server
     for the full duration of every LLM call (60-180s typical).
  2. The 120-second timeout was below the cold-cache cost of any
     document over ~12K Hebrew characters. Three back-to-back timeouts
     on case 8174-24 dropped 43 appellant claims on the floor.

Phase 1 of the remediation plan — keeps claude_session as the engine
(no Anthropic API switch) and restructures around it:

claude_session.py
  • query / query_json are now async — asyncio.create_subprocess_exec
    instead of subprocess.run, so MCP server can serve other coroutines
    while a call is in flight.
  • DEFAULT_TIMEOUT 120 → 1800 (30 min). High enough that no realistic
    document hits it; bounded so a runaway never zombifies forever.
  • LONG_TIMEOUT 300 → 3600 for opus block writing on full case context.
  • TimeoutError now actually kills the subprocess (asyncio.wait_for
    cancellation alone leaves the child running).

claims_extractor.py
  • _split_by_sections: chunks at numbered sections / Hebrew letter
    headings / "פרק" markers / markdown ##, falls back to paragraph
    breaks, then to hard splits. Targets 12K chars per chunk — small
    enough that each chunk reliably finishes inside the timeout.
  • _extract_chunk: per-chunk retry (1 attempt by default) with
    structured logging on failure. Failed chunks no longer crash the
    overall extraction; they're skipped with a partial-result warning.
  • extract_claims_with_ai now runs chunks in parallel via
    asyncio.gather bounded by a semaphore (CHUNK_CONCURRENCY=3).
    For a 25K-char appeal: was sequential 150-300s, now ~70-90s.

Updated all 9 callers (claims, appraiser facts, block writer, qa
validator, brainstorm, learning loop, style analyzer × 3) to await
the now-async API.

The one-shot scripts/extract_claims_8174.py used to recover 43
appellant claims on case 8174-24 has been moved to .archive/ — phase 1
makes it obsolete. SCRIPTS.md updated.

Phase 2 (background-task wrapper around LLM-bound MCP tools, persistent
llm_tasks table, SSE progress) is the structural follow-up — separate PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 14:21:35 +00:00
parent 9bdfb05350
commit 28f49defff
10 changed files with 329 additions and 82 deletions

View File

@@ -103,7 +103,7 @@ async def extract_facts_from_document(
f"שמאי: {appraiser_name}{chunk_label}\n\n"
f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---"
)
result = claude_session.query_json(prompt, timeout=180)
result = await claude_session.query_json(prompt)
if not isinstance(result, list):
logger.warning(
"extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s",

View File

@@ -380,7 +380,7 @@ async def write_block(
# Call Claude via Claude Code session (no API)
model_key = block_cfg["model"]
timeout = claude_session.LONG_TIMEOUT if model_key == "opus" else claude_session.DEFAULT_TIMEOUT
content = claude_session.query(prompt, timeout=timeout)
content = await claude_session.query(prompt, timeout=timeout)
return _build_result(block_id, content, block_cfg)

View File

@@ -134,14 +134,14 @@ async def generate_directions(
{doc_context or '(אין מסמכים בתיק)'}
"""
result = claude_session.query_json(user_content, timeout=120)
result = await claude_session.query_json(user_content)
if result is None:
logger.warning("Failed to parse brainstorm response: %s", raw[:300])
logger.warning("Failed to parse brainstorm response")
return {
"key_claims": [],
"directions": [],
"recommended_order": "",
"raw_response": raw,
"raw_response": "",
}
return result

View File

@@ -7,6 +7,7 @@
from __future__ import annotations
import asyncio
import logging
import re
from uuid import UUID
@@ -17,6 +18,21 @@ from legal_mcp.services import db, claude_session
logger = logging.getLogger(__name__)
# Each chunk targets ~12K chars (≈3K tokens of Hebrew). Smaller than the
# previous 25K because:
# • A single ``claude -p`` call on a 25K-char Hebrew prompt with cold
# cache routinely hit ~150-180s. 12K chunks finish in ~60-90s.
# • Per-chunk retry costs less when chunks are smaller.
# • Parallel chunks benefit more — see CHUNK_CONCURRENCY.
CHUNK_TARGET_CHARS = 12000
# How many chunks to send to Claude in parallel. Each subprocess holds
# ~300 MB RSS plus its own MCP stack; concurrency=3 keeps the box usable.
CHUNK_CONCURRENCY = 3
# How many retry attempts per failed chunk before giving up on it.
CHUNK_RETRY_ATTEMPTS = 1
EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות.
@@ -43,6 +59,103 @@ EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחו
"""
# Section markers we treat as natural chunk boundaries when present.
# Hebrew legal briefs almost always use numbered sections like "10." or
# letter-section headings (".א", ".ב"). Splitting between sections keeps
# every chunk a self-contained argumentative unit.
_SECTION_BOUNDARY_RE = re.compile(
r"\n\s*("
r"\d+\.\s+\S" # numbered section: "10. טענות"
r"|[א-ת]\.\s+\S" # Hebrew letter section: "א. רקע"
r"|##\s+\S" # markdown heading
r"|פרק\s+\S" # "פרק" headings
r")"
)
def _split_by_sections(text: str, target: int = CHUNK_TARGET_CHARS) -> list[str]:
"""Split a long document into roughly ``target``-sized chunks at section
boundaries. Falls back to paragraph breaks, then to hard splits if a
section happens to be larger than ``target`` on its own.
"""
if len(text) <= target:
return [text]
boundaries = [m.start() for m in _SECTION_BOUNDARY_RE.finditer(text)]
boundaries = [0, *boundaries, len(text)]
chunks: list[str] = []
start = 0
for cut in boundaries[1:]:
# Greedy: keep adding sections to the current chunk until adding
# the next one would push past ``target``.
if cut - start < target:
continue
end = cut
if end - start > target * 1.5:
# Section group exceeds 1.5× target — fall back to paragraph
# break inside it to avoid one chunk being far too big.
soft = text.rfind("\n\n", start, start + target)
if soft > start + target // 2:
end = soft
chunks.append(text[start:end].strip())
start = end
if start < len(text):
chunks.append(text[start:].strip())
# Hard splits for any chunk that is still too large (rare, but
# documents without any section markers can fall through).
final: list[str] = []
for c in chunks:
if len(c) <= target * 1.5:
final.append(c)
continue
for i in range(0, len(c), target):
final.append(c[i:i + target])
return [c for c in final if c.strip()]
async def _extract_chunk(
chunk: str,
chunk_index: int,
chunk_total: int,
context: str,
) -> tuple[int, list[dict] | None]:
"""Run extraction on one chunk with retry. Returns ``(chunk_index, claims_or_None)``.
None means the chunk failed both the initial call and every retry
(caller can use this to mark the result as partial).
"""
chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else ""
prompt = (
f"{EXTRACT_CLAIMS_PROMPT}\n\n"
f"{context}{chunk_label}\n\n"
f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---"
)
last_err: Exception | None = None
for attempt in range(CHUNK_RETRY_ATTEMPTS + 1):
try:
claims = await claude_session.query_json(prompt)
except Exception as e:
last_err = e
logger.warning(
"extract_claims chunk %d/%d attempt %d raised: %s",
chunk_index + 1, chunk_total, attempt + 1, e,
)
continue
if isinstance(claims, list):
return chunk_index, claims
logger.warning(
"extract_claims chunk %d/%d attempt %d returned non-list (%s)",
chunk_index + 1, chunk_total, attempt + 1, type(claims).__name__,
)
logger.error(
"extract_claims chunk %d/%d failed after %d attempts: %s",
chunk_index + 1, chunk_total, CHUNK_RETRY_ATTEMPTS + 1, last_err,
)
return chunk_index, None
async def extract_claims_with_ai(
text: str,
doc_type: str = "appeal",
@@ -50,68 +163,62 @@ async def extract_claims_with_ai(
) -> list[dict]:
"""חילוץ טענות מכתב טענות באמצעות Claude.
Splits ``text`` at section boundaries, runs every chunk through
Claude in parallel (bounded by ``CHUNK_CONCURRENCY``), retries each
failed chunk once, and merges the results in original document order.
Failed chunks are logged but don't block the overall extraction —
we return what we got and surface the gap via the logs.
Args:
text: טקסט המסמך
doc_type: סוג המסמך (appeal/response)
party_hint: רמז לזהות הצד (אם ידוע)
Returns:
רשימת טענות עם party_role, claim_text, topic
רשימת טענות עם party_role, claim_text, topic, claim_index.
"""
context = f"סוג המסמך: {doc_type}"
if party_hint:
context += f"\nהצד המגיש: {party_hint}"
# For very long documents, split into chunks and merge results
max_chars_per_call = 25000
chunks = []
if len(text) > max_chars_per_call:
# Split at paragraph boundaries
pos = 0
while pos < len(text):
end = min(pos + max_chars_per_call, len(text))
if end < len(text):
# Find paragraph break near the limit
break_pos = text.rfind("\n\n", pos, end)
if break_pos > pos + max_chars_per_call // 2:
end = break_pos
chunks.append(text[pos:end])
pos = end
logger.info("Document split into %d chunks (%d chars total)", len(chunks), len(text))
else:
chunks = [text]
all_claims = []
for i, chunk in enumerate(chunks):
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
prompt = (
f"{EXTRACT_CLAIMS_PROMPT}\n\n"
f"{context}{chunk_label}\n\n"
f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---"
chunks = _split_by_sections(text)
if len(chunks) > 1:
logger.info(
"extract_claims: split %d chars into %d chunks (target=%d, concurrency=%d)",
len(text), len(chunks), CHUNK_TARGET_CHARS, CHUNK_CONCURRENCY,
)
claims = claude_session.query_json(prompt, timeout=120)
if claims is None:
logger.warning("Failed to parse claims for chunk %d: %s", i, raw[:200])
continue
if isinstance(claims, list):
all_claims.extend(claims)
claims = all_claims
sem = asyncio.Semaphore(CHUNK_CONCURRENCY)
async def _bounded(idx: int, c: str) -> tuple[int, list[dict] | None]:
async with sem:
return await _extract_chunk(c, idx, len(chunks), context)
results = await asyncio.gather(*[_bounded(i, c) for i, c in enumerate(chunks)])
# Merge in original order. Skip chunks that failed entirely.
failed = [i for i, r in results if r is None]
if failed:
logger.warning(
"extract_claims: %d/%d chunks failed (indices=%s) — returning partial result",
len(failed), len(chunks), failed,
)
merged: list[dict] = []
for idx, claims in sorted(results, key=lambda x: x[0]):
if not claims:
return []
continue
merged.extend(claims)
if not isinstance(claims, list):
return []
# Add claim_index
for i, claim in enumerate(claims):
claim["claim_index"] = i
# Validate required fields
# Add claim_index and drop entries missing required fields.
cleaned: list[dict] = []
for i, claim in enumerate(merged):
if not isinstance(claim, dict):
continue
if "party_role" not in claim or "claim_text" not in claim:
continue
return [c for c in claims if "party_role" in c and "claim_text" in c]
claim["claim_index"] = i
cleaned.append(claim)
return cleaned
def _infer_claim_type(doc_type: str, source_name: str) -> str:

View File

@@ -1,27 +1,41 @@
"""Claude Code session bridge — runs prompts via `claude -p` instead of API.
All LLM calls in the project should use this module instead of calling
the Anthropic API directly. This uses the local Claude Code CLI which
runs on the user's claude.ai session — zero API cost.
All LLM calls in the project go through this module. We shell out to the
local Claude Code CLI which uses the developer's claude.ai session — zero
direct Anthropic API cost.
History: this module was originally synchronous (``subprocess.run``) with
a 120-second timeout. That broke for large legal documents:
1. Sync subprocess stalled the asyncio event loop in the MCP server
while a single LLM call was in flight.
2. 120 seconds was far too short. A 25K-character Hebrew appeal on cold
prompt cache routinely takes 130-180 seconds; we proved this in case
8174-24 (three timeouts in a row).
The fix: switch to async subprocess (non-blocking) and raise the default
ceiling to 30 minutes — long enough that no realistic document hits it,
but bounded so a runaway never zombifies forever.
"""
from __future__ import annotations
import asyncio
import json
import logging
import subprocess
from pathlib import Path
from legal_mcp.config import parse_llm_json
logger = logging.getLogger(__name__)
# Default timeout for claude -p calls (seconds)
DEFAULT_TIMEOUT = 120
LONG_TIMEOUT = 300 # For complex tasks like block writing
# Default ceiling for any single ``claude -p`` invocation, in seconds.
# 30 min covers any single-document call we make in practice (chunking
# handles the rest); the bound exists only to prevent runaway zombies.
DEFAULT_TIMEOUT = 1800
LONG_TIMEOUT = 3600 # opus block writing on full case context
def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str:
async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str:
"""Send a prompt to Claude Code headless and return the text response.
Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit —
@@ -29,14 +43,14 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st
Args:
prompt: The prompt to send.
timeout: Max seconds to wait.
timeout: Max seconds before the subprocess is killed.
max_turns: Max conversation turns (1 = single response).
Returns:
The text response from Claude.
Raises:
RuntimeError: If claude CLI is not available or fails.
RuntimeError: If claude CLI is not available, fails, or times out.
"""
cmd = [
"claude", "-p",
@@ -45,23 +59,34 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st
]
try:
result = subprocess.run(
cmd,
input=prompt,
capture_output=True,
text=True,
timeout=timeout,
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError:
raise RuntimeError("Claude CLI not found. Install Claude Code or add 'claude' to PATH.")
except subprocess.TimeoutExpired:
try:
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=prompt.encode("utf-8")),
timeout=timeout,
)
except asyncio.TimeoutError:
# wait_for cancellation alone leaves the child running.
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
raise RuntimeError(f"Claude CLI timed out after {timeout}s")
if result.returncode != 0:
stderr = result.stderr.strip()[:500] if result.stderr else "unknown error"
raise RuntimeError(f"Claude CLI failed (exit {result.returncode}): {stderr}")
if proc.returncode != 0:
stderr = stderr_b.decode("utf-8", errors="replace").strip()[:500] or "unknown error"
raise RuntimeError(f"Claude CLI failed (exit {proc.returncode}): {stderr}")
stdout = result.stdout.strip()
stdout = stdout_b.decode("utf-8", errors="replace").strip()
if not stdout:
raise RuntimeError("Claude CLI returned empty response")
@@ -75,10 +100,10 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st
return stdout
def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None:
async def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None:
"""Send a prompt and parse the response as JSON.
Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation).
"""
raw = query(prompt, timeout=timeout)
raw = await query(prompt, timeout=timeout)
return parse_llm_json(raw)

View File

@@ -90,10 +90,10 @@ async def analyze_changes(draft_text: str, final_text: str) -> dict:
--- גרסה סופית ---
{final_sample}
"""
result = claude_session.query_json(prompt, timeout=120)
result = await claude_session.query_json(prompt)
if result is None:
logger.warning("Failed to parse lessons response")
return {"changes": [], "new_expressions": [], "overall_assessment": raw[:200]}
return {"changes": [], "new_expressions": [], "overall_assessment": ""}
return result

View File

@@ -144,9 +144,9 @@ async def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict:
## בלוק הדיון:
{discussion}"""
parsed = claude_session.query_json(prompt, timeout=120)
parsed = await claude_session.query_json(prompt)
if parsed is None:
logger.warning("Failed to parse claims check: %s", raw[:300])
logger.warning("Failed to parse claims check")
# Fallback: assume all covered (don't block export on parse failure)
return {"name": "claims_coverage", "passed": True,
"errors": ["שגיאה בפענוח תוצאות — לא ניתן לבדוק"], "severity": "warning"}

View File

@@ -159,7 +159,7 @@ async def _analyze_single_pass(rows, appeal_subtype: str = "") -> dict:
decisions_text += f"\n\n--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n"
decisions_text += row["full_text"]
raw = claude_session.query(
raw = await claude_session.query(
ANALYSIS_PROMPT.format(decisions=decisions_text),
timeout=claude_session.LONG_TIMEOUT,
)
@@ -176,7 +176,7 @@ async def _analyze_multi_pass(rows, appeal_subtype: str = "") -> dict:
decision_text = f"--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n"
decision_text += row["full_text"]
raw = claude_session.query(
raw = await claude_session.query(
SINGLE_DECISION_PROMPT.format(decision=decision_text),
timeout=claude_session.LONG_TIMEOUT,
)
@@ -189,7 +189,7 @@ async def _analyze_multi_pass(rows, appeal_subtype: str = "") -> dict:
return {"error": "לא הצלחתי לחלץ דפוסים מההחלטות"}
# Pass 2: Synthesize across all decisions
raw = claude_session.query(
raw = await claude_session.query(
SYNTHESIS_PROMPT.format(
num_decisions=len(rows),
patterns=json.dumps(all_patterns, ensure_ascii=False, indent=2),

View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""One-shot: extract appellant claims for case 8174-24.
The analyst (CMPA-13) finished but `extract_claims` timed out three times on
the main 25K-char appeal document, so we have only 19 committee/response
claims in DB and zero appellant claims. This script reruns extraction with
a higher timeout and parallel chunks.
Targets:
• כתב ערר 18.12.24 (appeal, 25,474 chars) — appellant claims
• השלמת מסמכים תמ״א 38 (decision, 3,718 chars) — supplementary appeal filing
After phase 1.1-1.3 lands, this script becomes obsolete.
Usage: /home/chaim/legal-ai/mcp-server/.venv/bin/python scripts/extract_claims_8174.py
"""
from __future__ import annotations
import asyncio
import json
import sys
import time
from pathlib import Path
from uuid import UUID
# Ensure we can import legal_mcp from this repo's mcp-server tree
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
from legal_mcp.services import claims_extractor, claude_session, db
# ── Patch claude_session to use 30-min ceiling ───────────────────────
# The hard-coded timeout=120 in claims_extractor.extract_claims_with_ai is
# what kept failing. Force every claude_session call here to use 1800s.
_orig_query_json = claude_session.query_json
_orig_query = claude_session.query
def _patched_query_json(prompt: str, timeout: int = 120):
return _orig_query_json(prompt, timeout=max(timeout, 1800))
def _patched_query(prompt: str, timeout: int = 120, max_turns: int = 1):
return _orig_query(prompt, timeout=max(timeout, 1800), max_turns=max_turns)
claude_session.query_json = _patched_query_json
claude_session.query = _patched_query
CASE_NUMBER = "8174-24"
TARGETS = [
# (doc_id, title hint, doc_type override, party_hint)
("655f96f7-d406-44ac-bb53-6b2c1ab2909c", "כתב ערר 18.12.24", "appeal", "יואל גולדמן"),
("13b4795a-4fb7-460e-bddf-a5d282a1a67f", "השלמת מסמכים תמ״א 38", "appeal", "יואל גולדמן"),
]
async def main() -> int:
case = await db.get_case_by_number(CASE_NUMBER)
if not case:
print(f"ERROR: case {CASE_NUMBER} not found")
return 1
case_id = UUID(case["id"])
print(f"=== Case {CASE_NUMBER}{case['title']} ===")
print()
for doc_id, label, doc_type, party_hint in TARGETS:
text = await db.get_document_text(UUID(doc_id))
if not text:
print(f"SKIP {label} — no extracted_text")
continue
chars = len(text)
print(f"--- {label} ({chars:,} chars, doc_type={doc_type}) ---")
t0 = time.monotonic()
try:
result = await claims_extractor.extract_and_store_claims(
case_id=case_id,
document_id=UUID(doc_id),
text=text,
doc_type=doc_type,
party_hint=party_hint,
)
except Exception as e:
print(f" FAILED: {e}")
continue
dt = time.monotonic() - t0
print(f" done in {dt:.1f}s — {json.dumps(result, ensure_ascii=False)}")
print()
# Final tally
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT party_role, claim_type, source_document, count(*) as n
FROM claims WHERE case_id = $1
GROUP BY 1, 2, 3 ORDER BY 1, 3""",
case_id,
)
print("=== Final claims breakdown ===")
total = 0
for r in rows:
n = r["n"]
total += n
print(f" {r['party_role']:12} {r['claim_type']:10} ({n:3}) ← {r['source_document']}")
print(f" TOTAL: {total} claims")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

View File

@@ -32,6 +32,7 @@
| `export-decision-docx.py` | ייצוא החלטה ל-DOCX | MCP: `export_docx()` |
| `extract-citations.py` | חילוץ ציטוטי פסיקה מבלוק י | MCP service: `references_extractor.py` |
| `extract-claims.py` | חילוץ טענות מבלוק ז | MCP: `extract_claims()` + `claims_extractor.py` |
| `extract_claims_8174.py` | חד-פעמי — חילוץ טענות חסרות לתיק 8174-24 אחרי timeout של האנליסט (43 טענות עורר נוספו 30/04/26) | phase 1: `claude_session` async + 30min timeout + chunking סמנטי |
| `extract_all_google_vision.py` | OCR בכמות עם Google Vision | MCP: `document_upload()` pipeline |
| `extract_originals.py` | חילוץ טקסט מ-PDF עם Claude Opus | MCP service: `extractor.py` |
| `extract_originals_ocr.py` | חילוץ OCR מלא מ-PDF | MCP service: `extractor.py` |