From 28f49defffbc7fa7aba9b207fdf67b768033841a Mon Sep 17 00:00:00 2001 From: Chaim Date: Thu, 30 Apr 2026 14:21:35 +0000 Subject: [PATCH] LLM session: async, 30min timeout, semantic chunking + parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The claude_session bridge had two structural defects that made any non-trivial document extraction unreliable: 1. subprocess.run() blocks the asyncio event loop in the MCP server for the full duration of every LLM call (60-180s typical). 2. The 120-second timeout was below the cold-cache cost of any document over ~12K Hebrew characters. Three back-to-back timeouts on case 8174-24 dropped 43 appellant claims on the floor. Phase 1 of the remediation plan — keeps claude_session as the engine (no Anthropic API switch) and restructures around it: claude_session.py • query / query_json are now async — asyncio.create_subprocess_exec instead of subprocess.run, so MCP server can serve other coroutines while a call is in flight. • DEFAULT_TIMEOUT 120 → 1800 (30 min). High enough that no realistic document hits it; bounded so a runaway never zombifies forever. • LONG_TIMEOUT 300 → 3600 for opus block writing on full case context. • TimeoutError now actually kills the subprocess (asyncio.wait_for cancellation alone leaves the child running). claims_extractor.py • _split_by_sections: chunks at numbered sections / Hebrew letter headings / "פרק" markers / markdown ##, falls back to paragraph breaks, then to hard splits. Targets 12K chars per chunk — small enough that each chunk reliably finishes inside the timeout. • _extract_chunk: per-chunk retry (1 attempt by default) with structured logging on failure. Failed chunks no longer crash the overall extraction; they're skipped with a partial-result warning. • extract_claims_with_ai now runs chunks in parallel via asyncio.gather bounded by a semaphore (CHUNK_CONCURRENCY=3). For a 25K-char appeal: was sequential 150-300s, now ~70-90s. Updated all 9 callers (claims, appraiser facts, block writer, qa validator, brainstorm, learning loop, style analyzer × 3) to await the now-async API. The one-shot scripts/extract_claims_8174.py used to recover 43 appellant claims on case 8174-24 has been moved to .archive/ — phase 1 makes it obsolete. SCRIPTS.md updated. Phase 2 (background-task wrapper around LLM-bound MCP tools, persistent llm_tasks table, SSE progress) is the structural follow-up — separate PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../services/appraiser_facts_extractor.py | 2 +- .../src/legal_mcp/services/block_writer.py | 2 +- .../src/legal_mcp/services/brainstorm.py | 6 +- .../legal_mcp/services/claims_extractor.py | 199 ++++++++++++++---- .../src/legal_mcp/services/claude_session.py | 73 ++++--- .../src/legal_mcp/services/learning_loop.py | 4 +- .../src/legal_mcp/services/qa_validator.py | 4 +- .../src/legal_mcp/services/style_analyzer.py | 6 +- scripts/.archive/extract_claims_8174.py | 114 ++++++++++ scripts/SCRIPTS.md | 1 + 10 files changed, 329 insertions(+), 82 deletions(-) create mode 100644 scripts/.archive/extract_claims_8174.py diff --git a/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py b/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py index 4b6f63c..501fabf 100644 --- a/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py +++ b/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py @@ -103,7 +103,7 @@ async def extract_facts_from_document( f"שמאי: {appraiser_name}{chunk_label}\n\n" f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---" ) - result = claude_session.query_json(prompt, timeout=180) + result = await claude_session.query_json(prompt) if not isinstance(result, list): logger.warning( "extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s", diff --git a/mcp-server/src/legal_mcp/services/block_writer.py b/mcp-server/src/legal_mcp/services/block_writer.py index 4100dc2..db030ed 100644 --- a/mcp-server/src/legal_mcp/services/block_writer.py +++ b/mcp-server/src/legal_mcp/services/block_writer.py @@ -380,7 +380,7 @@ async def write_block( # Call Claude via Claude Code session (no API) model_key = block_cfg["model"] timeout = claude_session.LONG_TIMEOUT if model_key == "opus" else claude_session.DEFAULT_TIMEOUT - content = claude_session.query(prompt, timeout=timeout) + content = await claude_session.query(prompt, timeout=timeout) return _build_result(block_id, content, block_cfg) diff --git a/mcp-server/src/legal_mcp/services/brainstorm.py b/mcp-server/src/legal_mcp/services/brainstorm.py index 50ddd7f..174bc93 100644 --- a/mcp-server/src/legal_mcp/services/brainstorm.py +++ b/mcp-server/src/legal_mcp/services/brainstorm.py @@ -134,14 +134,14 @@ async def generate_directions( {doc_context or '(אין מסמכים בתיק)'} """ - result = claude_session.query_json(user_content, timeout=120) + result = await claude_session.query_json(user_content) if result is None: - logger.warning("Failed to parse brainstorm response: %s", raw[:300]) + logger.warning("Failed to parse brainstorm response") return { "key_claims": [], "directions": [], "recommended_order": "", - "raw_response": raw, + "raw_response": "", } return result diff --git a/mcp-server/src/legal_mcp/services/claims_extractor.py b/mcp-server/src/legal_mcp/services/claims_extractor.py index a31a0d8..e7c9416 100644 --- a/mcp-server/src/legal_mcp/services/claims_extractor.py +++ b/mcp-server/src/legal_mcp/services/claims_extractor.py @@ -7,6 +7,7 @@ from __future__ import annotations +import asyncio import logging import re from uuid import UUID @@ -17,6 +18,21 @@ from legal_mcp.services import db, claude_session logger = logging.getLogger(__name__) +# Each chunk targets ~12K chars (≈3K tokens of Hebrew). Smaller than the +# previous 25K because: +# • A single ``claude -p`` call on a 25K-char Hebrew prompt with cold +# cache routinely hit ~150-180s. 12K chunks finish in ~60-90s. +# • Per-chunk retry costs less when chunks are smaller. +# • Parallel chunks benefit more — see CHUNK_CONCURRENCY. +CHUNK_TARGET_CHARS = 12000 + +# How many chunks to send to Claude in parallel. Each subprocess holds +# ~300 MB RSS plus its own MCP stack; concurrency=3 keeps the box usable. +CHUNK_CONCURRENCY = 3 + +# How many retry attempts per failed chunk before giving up on it. +CHUNK_RETRY_ATTEMPTS = 1 + EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחום תכנון ובניה. תפקידך לחלץ טענות מכתב טענות. @@ -43,6 +59,103 @@ EXTRACT_CLAIMS_PROMPT = """אתה מנתח מסמכים משפטיים בתחו """ +# Section markers we treat as natural chunk boundaries when present. +# Hebrew legal briefs almost always use numbered sections like "10." or +# letter-section headings (".א", ".ב"). Splitting between sections keeps +# every chunk a self-contained argumentative unit. +_SECTION_BOUNDARY_RE = re.compile( + r"\n\s*(" + r"\d+\.\s+\S" # numbered section: "10. טענות" + r"|[א-ת]\.\s+\S" # Hebrew letter section: "א. רקע" + r"|##\s+\S" # markdown heading + r"|פרק\s+\S" # "פרק" headings + r")" +) + + +def _split_by_sections(text: str, target: int = CHUNK_TARGET_CHARS) -> list[str]: + """Split a long document into roughly ``target``-sized chunks at section + boundaries. Falls back to paragraph breaks, then to hard splits if a + section happens to be larger than ``target`` on its own. + """ + if len(text) <= target: + return [text] + + boundaries = [m.start() for m in _SECTION_BOUNDARY_RE.finditer(text)] + boundaries = [0, *boundaries, len(text)] + + chunks: list[str] = [] + start = 0 + for cut in boundaries[1:]: + # Greedy: keep adding sections to the current chunk until adding + # the next one would push past ``target``. + if cut - start < target: + continue + end = cut + if end - start > target * 1.5: + # Section group exceeds 1.5× target — fall back to paragraph + # break inside it to avoid one chunk being far too big. + soft = text.rfind("\n\n", start, start + target) + if soft > start + target // 2: + end = soft + chunks.append(text[start:end].strip()) + start = end + if start < len(text): + chunks.append(text[start:].strip()) + + # Hard splits for any chunk that is still too large (rare, but + # documents without any section markers can fall through). + final: list[str] = [] + for c in chunks: + if len(c) <= target * 1.5: + final.append(c) + continue + for i in range(0, len(c), target): + final.append(c[i:i + target]) + return [c for c in final if c.strip()] + + +async def _extract_chunk( + chunk: str, + chunk_index: int, + chunk_total: int, + context: str, +) -> tuple[int, list[dict] | None]: + """Run extraction on one chunk with retry. Returns ``(chunk_index, claims_or_None)``. + + None means the chunk failed both the initial call and every retry + (caller can use this to mark the result as partial). + """ + chunk_label = f" (חלק {chunk_index + 1}/{chunk_total})" if chunk_total > 1 else "" + prompt = ( + f"{EXTRACT_CLAIMS_PROMPT}\n\n" + f"{context}{chunk_label}\n\n" + f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---" + ) + last_err: Exception | None = None + for attempt in range(CHUNK_RETRY_ATTEMPTS + 1): + try: + claims = await claude_session.query_json(prompt) + except Exception as e: + last_err = e + logger.warning( + "extract_claims chunk %d/%d attempt %d raised: %s", + chunk_index + 1, chunk_total, attempt + 1, e, + ) + continue + if isinstance(claims, list): + return chunk_index, claims + logger.warning( + "extract_claims chunk %d/%d attempt %d returned non-list (%s)", + chunk_index + 1, chunk_total, attempt + 1, type(claims).__name__, + ) + logger.error( + "extract_claims chunk %d/%d failed after %d attempts: %s", + chunk_index + 1, chunk_total, CHUNK_RETRY_ATTEMPTS + 1, last_err, + ) + return chunk_index, None + + async def extract_claims_with_ai( text: str, doc_type: str = "appeal", @@ -50,68 +163,62 @@ async def extract_claims_with_ai( ) -> list[dict]: """חילוץ טענות מכתב טענות באמצעות Claude. + Splits ``text`` at section boundaries, runs every chunk through + Claude in parallel (bounded by ``CHUNK_CONCURRENCY``), retries each + failed chunk once, and merges the results in original document order. + Failed chunks are logged but don't block the overall extraction — + we return what we got and surface the gap via the logs. + Args: text: טקסט המסמך doc_type: סוג המסמך (appeal/response) party_hint: רמז לזהות הצד (אם ידוע) Returns: - רשימת טענות עם party_role, claim_text, topic + רשימת טענות עם party_role, claim_text, topic, claim_index. """ context = f"סוג המסמך: {doc_type}" if party_hint: context += f"\nהצד המגיש: {party_hint}" - # For very long documents, split into chunks and merge results - max_chars_per_call = 25000 - chunks = [] - if len(text) > max_chars_per_call: - # Split at paragraph boundaries - pos = 0 - while pos < len(text): - end = min(pos + max_chars_per_call, len(text)) - if end < len(text): - # Find paragraph break near the limit - break_pos = text.rfind("\n\n", pos, end) - if break_pos > pos + max_chars_per_call // 2: - end = break_pos - chunks.append(text[pos:end]) - pos = end - logger.info("Document split into %d chunks (%d chars total)", len(chunks), len(text)) - else: - chunks = [text] - - all_claims = [] - - for i, chunk in enumerate(chunks): - chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else "" - prompt = ( - f"{EXTRACT_CLAIMS_PROMPT}\n\n" - f"{context}{chunk_label}\n\n" - f"--- תחילת מסמך ---\n{chunk}\n--- סוף מסמך ---" + chunks = _split_by_sections(text) + if len(chunks) > 1: + logger.info( + "extract_claims: split %d chars into %d chunks (target=%d, concurrency=%d)", + len(text), len(chunks), CHUNK_TARGET_CHARS, CHUNK_CONCURRENCY, ) - claims = claude_session.query_json(prompt, timeout=120) - if claims is None: - logger.warning("Failed to parse claims for chunk %d: %s", i, raw[:200]) + + sem = asyncio.Semaphore(CHUNK_CONCURRENCY) + + async def _bounded(idx: int, c: str) -> tuple[int, list[dict] | None]: + async with sem: + return await _extract_chunk(c, idx, len(chunks), context) + + results = await asyncio.gather(*[_bounded(i, c) for i, c in enumerate(chunks)]) + + # Merge in original order. Skip chunks that failed entirely. + failed = [i for i, r in results if r is None] + if failed: + logger.warning( + "extract_claims: %d/%d chunks failed (indices=%s) — returning partial result", + len(failed), len(chunks), failed, + ) + merged: list[dict] = [] + for idx, claims in sorted(results, key=lambda x: x[0]): + if not claims: continue - if isinstance(claims, list): - all_claims.extend(claims) + merged.extend(claims) - claims = all_claims - if not claims: - return [] - - if not isinstance(claims, list): - return [] - - # Add claim_index - for i, claim in enumerate(claims): - claim["claim_index"] = i - # Validate required fields + # Add claim_index and drop entries missing required fields. + cleaned: list[dict] = [] + for i, claim in enumerate(merged): + if not isinstance(claim, dict): + continue if "party_role" not in claim or "claim_text" not in claim: continue - - return [c for c in claims if "party_role" in c and "claim_text" in c] + claim["claim_index"] = i + cleaned.append(claim) + return cleaned def _infer_claim_type(doc_type: str, source_name: str) -> str: diff --git a/mcp-server/src/legal_mcp/services/claude_session.py b/mcp-server/src/legal_mcp/services/claude_session.py index 99368cb..b1014a7 100644 --- a/mcp-server/src/legal_mcp/services/claude_session.py +++ b/mcp-server/src/legal_mcp/services/claude_session.py @@ -1,27 +1,41 @@ """Claude Code session bridge — runs prompts via `claude -p` instead of API. -All LLM calls in the project should use this module instead of calling -the Anthropic API directly. This uses the local Claude Code CLI which -runs on the user's claude.ai session — zero API cost. +All LLM calls in the project go through this module. We shell out to the +local Claude Code CLI which uses the developer's claude.ai session — zero +direct Anthropic API cost. + +History: this module was originally synchronous (``subprocess.run``) with +a 120-second timeout. That broke for large legal documents: + + 1. Sync subprocess stalled the asyncio event loop in the MCP server + while a single LLM call was in flight. + 2. 120 seconds was far too short. A 25K-character Hebrew appeal on cold + prompt cache routinely takes 130-180 seconds; we proved this in case + 8174-24 (three timeouts in a row). + +The fix: switch to async subprocess (non-blocking) and raise the default +ceiling to 30 minutes — long enough that no realistic document hits it, +but bounded so a runaway never zombifies forever. """ from __future__ import annotations +import asyncio import json import logging -import subprocess -from pathlib import Path from legal_mcp.config import parse_llm_json logger = logging.getLogger(__name__) -# Default timeout for claude -p calls (seconds) -DEFAULT_TIMEOUT = 120 -LONG_TIMEOUT = 300 # For complex tasks like block writing +# Default ceiling for any single ``claude -p`` invocation, in seconds. +# 30 min covers any single-document call we make in practice (chunking +# handles the rest); the bound exists only to prevent runaway zombies. +DEFAULT_TIMEOUT = 1800 +LONG_TIMEOUT = 3600 # opus block writing on full case context -def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str: +async def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> str: """Send a prompt to Claude Code headless and return the text response. Passes the prompt via stdin (not argv) to avoid the OS ARG_MAX limit — @@ -29,14 +43,14 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st Args: prompt: The prompt to send. - timeout: Max seconds to wait. + timeout: Max seconds before the subprocess is killed. max_turns: Max conversation turns (1 = single response). Returns: The text response from Claude. Raises: - RuntimeError: If claude CLI is not available or fails. + RuntimeError: If claude CLI is not available, fails, or times out. """ cmd = [ "claude", "-p", @@ -45,23 +59,34 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st ] try: - result = subprocess.run( - cmd, - input=prompt, - capture_output=True, - text=True, - timeout=timeout, + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError: raise RuntimeError("Claude CLI not found. Install Claude Code or add 'claude' to PATH.") - except subprocess.TimeoutExpired: + + try: + stdout_b, stderr_b = await asyncio.wait_for( + proc.communicate(input=prompt.encode("utf-8")), + timeout=timeout, + ) + except asyncio.TimeoutError: + # wait_for cancellation alone leaves the child running. + try: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass raise RuntimeError(f"Claude CLI timed out after {timeout}s") - if result.returncode != 0: - stderr = result.stderr.strip()[:500] if result.stderr else "unknown error" - raise RuntimeError(f"Claude CLI failed (exit {result.returncode}): {stderr}") + if proc.returncode != 0: + stderr = stderr_b.decode("utf-8", errors="replace").strip()[:500] or "unknown error" + raise RuntimeError(f"Claude CLI failed (exit {proc.returncode}): {stderr}") - stdout = result.stdout.strip() + stdout = stdout_b.decode("utf-8", errors="replace").strip() if not stdout: raise RuntimeError("Claude CLI returned empty response") @@ -75,10 +100,10 @@ def query(prompt: str, timeout: int = DEFAULT_TIMEOUT, max_turns: int = 1) -> st return stdout -def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None: +async def query_json(prompt: str, timeout: int = DEFAULT_TIMEOUT) -> dict | list | None: """Send a prompt and parse the response as JSON. Uses parse_llm_json for robust parsing (handles markdown wrapping, truncation). """ - raw = query(prompt, timeout=timeout) + raw = await query(prompt, timeout=timeout) return parse_llm_json(raw) diff --git a/mcp-server/src/legal_mcp/services/learning_loop.py b/mcp-server/src/legal_mcp/services/learning_loop.py index 74ec2ce..e529d89 100644 --- a/mcp-server/src/legal_mcp/services/learning_loop.py +++ b/mcp-server/src/legal_mcp/services/learning_loop.py @@ -90,10 +90,10 @@ async def analyze_changes(draft_text: str, final_text: str) -> dict: --- גרסה סופית --- {final_sample} """ - result = claude_session.query_json(prompt, timeout=120) + result = await claude_session.query_json(prompt) if result is None: logger.warning("Failed to parse lessons response") - return {"changes": [], "new_expressions": [], "overall_assessment": raw[:200]} + return {"changes": [], "new_expressions": [], "overall_assessment": ""} return result diff --git a/mcp-server/src/legal_mcp/services/qa_validator.py b/mcp-server/src/legal_mcp/services/qa_validator.py index b3d4301..f585208 100644 --- a/mcp-server/src/legal_mcp/services/qa_validator.py +++ b/mcp-server/src/legal_mcp/services/qa_validator.py @@ -144,9 +144,9 @@ async def check_claims_coverage(blocks: list[dict], claims: list[dict]) -> dict: ## בלוק הדיון: {discussion}""" - parsed = claude_session.query_json(prompt, timeout=120) + parsed = await claude_session.query_json(prompt) if parsed is None: - logger.warning("Failed to parse claims check: %s", raw[:300]) + logger.warning("Failed to parse claims check") # Fallback: assume all covered (don't block export on parse failure) return {"name": "claims_coverage", "passed": True, "errors": ["שגיאה בפענוח תוצאות — לא ניתן לבדוק"], "severity": "warning"} diff --git a/mcp-server/src/legal_mcp/services/style_analyzer.py b/mcp-server/src/legal_mcp/services/style_analyzer.py index 829747d..1103807 100644 --- a/mcp-server/src/legal_mcp/services/style_analyzer.py +++ b/mcp-server/src/legal_mcp/services/style_analyzer.py @@ -159,7 +159,7 @@ async def _analyze_single_pass(rows, appeal_subtype: str = "") -> dict: decisions_text += f"\n\n--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n" decisions_text += row["full_text"] - raw = claude_session.query( + raw = await claude_session.query( ANALYSIS_PROMPT.format(decisions=decisions_text), timeout=claude_session.LONG_TIMEOUT, ) @@ -176,7 +176,7 @@ async def _analyze_multi_pass(rows, appeal_subtype: str = "") -> dict: decision_text = f"--- החלטה {row['decision_number'] or 'ללא מספר'} ---\n" decision_text += row["full_text"] - raw = claude_session.query( + raw = await claude_session.query( SINGLE_DECISION_PROMPT.format(decision=decision_text), timeout=claude_session.LONG_TIMEOUT, ) @@ -189,7 +189,7 @@ async def _analyze_multi_pass(rows, appeal_subtype: str = "") -> dict: return {"error": "לא הצלחתי לחלץ דפוסים מההחלטות"} # Pass 2: Synthesize across all decisions - raw = claude_session.query( + raw = await claude_session.query( SYNTHESIS_PROMPT.format( num_decisions=len(rows), patterns=json.dumps(all_patterns, ensure_ascii=False, indent=2), diff --git a/scripts/.archive/extract_claims_8174.py b/scripts/.archive/extract_claims_8174.py new file mode 100644 index 0000000..6800305 --- /dev/null +++ b/scripts/.archive/extract_claims_8174.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +"""One-shot: extract appellant claims for case 8174-24. + +The analyst (CMPA-13) finished but `extract_claims` timed out three times on +the main 25K-char appeal document, so we have only 19 committee/response +claims in DB and zero appellant claims. This script reruns extraction with +a higher timeout and parallel chunks. + +Targets: + • כתב ערר 18.12.24 (appeal, 25,474 chars) — appellant claims + • השלמת מסמכים תמ״א 38 (decision, 3,718 chars) — supplementary appeal filing + +After phase 1.1-1.3 lands, this script becomes obsolete. + +Usage: /home/chaim/legal-ai/mcp-server/.venv/bin/python scripts/extract_claims_8174.py +""" + +from __future__ import annotations + +import asyncio +import json +import sys +import time +from pathlib import Path +from uuid import UUID + +# Ensure we can import legal_mcp from this repo's mcp-server tree +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src")) + +from legal_mcp.services import claims_extractor, claude_session, db + + +# ── Patch claude_session to use 30-min ceiling ─────────────────────── +# The hard-coded timeout=120 in claims_extractor.extract_claims_with_ai is +# what kept failing. Force every claude_session call here to use 1800s. +_orig_query_json = claude_session.query_json +_orig_query = claude_session.query + + +def _patched_query_json(prompt: str, timeout: int = 120): + return _orig_query_json(prompt, timeout=max(timeout, 1800)) + + +def _patched_query(prompt: str, timeout: int = 120, max_turns: int = 1): + return _orig_query(prompt, timeout=max(timeout, 1800), max_turns=max_turns) + + +claude_session.query_json = _patched_query_json +claude_session.query = _patched_query + + +CASE_NUMBER = "8174-24" + +TARGETS = [ + # (doc_id, title hint, doc_type override, party_hint) + ("655f96f7-d406-44ac-bb53-6b2c1ab2909c", "כתב ערר 18.12.24", "appeal", "יואל גולדמן"), + ("13b4795a-4fb7-460e-bddf-a5d282a1a67f", "השלמת מסמכים תמ״א 38", "appeal", "יואל גולדמן"), +] + + +async def main() -> int: + case = await db.get_case_by_number(CASE_NUMBER) + if not case: + print(f"ERROR: case {CASE_NUMBER} not found") + return 1 + case_id = UUID(case["id"]) + print(f"=== Case {CASE_NUMBER} — {case['title']} ===") + print() + + for doc_id, label, doc_type, party_hint in TARGETS: + text = await db.get_document_text(UUID(doc_id)) + if not text: + print(f"SKIP {label} — no extracted_text") + continue + + chars = len(text) + print(f"--- {label} ({chars:,} chars, doc_type={doc_type}) ---") + t0 = time.monotonic() + try: + result = await claims_extractor.extract_and_store_claims( + case_id=case_id, + document_id=UUID(doc_id), + text=text, + doc_type=doc_type, + party_hint=party_hint, + ) + except Exception as e: + print(f" FAILED: {e}") + continue + dt = time.monotonic() - t0 + print(f" done in {dt:.1f}s — {json.dumps(result, ensure_ascii=False)}") + print() + + # Final tally + pool = await db.get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT party_role, claim_type, source_document, count(*) as n + FROM claims WHERE case_id = $1 + GROUP BY 1, 2, 3 ORDER BY 1, 3""", + case_id, + ) + print("=== Final claims breakdown ===") + total = 0 + for r in rows: + n = r["n"] + total += n + print(f" {r['party_role']:12} {r['claim_type']:10} ({n:3}) ← {r['source_document']}") + print(f" TOTAL: {total} claims") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 629ca13..6eaf3d8 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -32,6 +32,7 @@ | `export-decision-docx.py` | ייצוא החלטה ל-DOCX | MCP: `export_docx()` | | `extract-citations.py` | חילוץ ציטוטי פסיקה מבלוק י | MCP service: `references_extractor.py` | | `extract-claims.py` | חילוץ טענות מבלוק ז | MCP: `extract_claims()` + `claims_extractor.py` | +| `extract_claims_8174.py` | חד-פעמי — חילוץ טענות חסרות לתיק 8174-24 אחרי timeout של האנליסט (43 טענות עורר נוספו 30/04/26) | phase 1: `claude_session` async + 30min timeout + chunking סמנטי | | `extract_all_google_vision.py` | OCR בכמות עם Google Vision | MCP: `document_upload()` pipeline | | `extract_originals.py` | חילוץ טקסט מ-PDF עם Claude Opus | MCP service: `extractor.py` | | `extract_originals_ocr.py` | חילוץ OCR מלא מ-PDF | MCP service: `extractor.py` |