Files
legal-ai/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py
Chaim 28f49defff
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m28s
LLM session: async, 30min timeout, semantic chunking + parallel
The claude_session bridge had two structural defects that made any
non-trivial document extraction unreliable:

  1. subprocess.run() blocks the asyncio event loop in the MCP server
     for the full duration of every LLM call (60-180s typical).
  2. The 120-second timeout was below the cold-cache cost of any
     document over ~12K Hebrew characters. Three back-to-back timeouts
     on case 8174-24 dropped 43 appellant claims on the floor.

Phase 1 of the remediation plan — keeps claude_session as the engine
(no Anthropic API switch) and restructures around it:

claude_session.py
  • query / query_json are now async — asyncio.create_subprocess_exec
    instead of subprocess.run, so MCP server can serve other coroutines
    while a call is in flight.
  • DEFAULT_TIMEOUT 120 → 1800 (30 min). High enough that no realistic
    document hits it; bounded so a runaway never zombifies forever.
  • LONG_TIMEOUT 300 → 3600 for opus block writing on full case context.
  • TimeoutError now actually kills the subprocess (asyncio.wait_for
    cancellation alone leaves the child running).

claims_extractor.py
  • _split_by_sections: chunks at numbered sections / Hebrew letter
    headings / "פרק" markers / markdown ##, falls back to paragraph
    breaks, then to hard splits. Targets 12K chars per chunk — small
    enough that each chunk reliably finishes inside the timeout.
  • _extract_chunk: per-chunk retry (1 attempt by default) with
    structured logging on failure. Failed chunks no longer crash the
    overall extraction; they're skipped with a partial-result warning.
  • extract_claims_with_ai now runs chunks in parallel via
    asyncio.gather bounded by a semaphore (CHUNK_CONCURRENCY=3).
    For a 25K-char appeal: was sequential 150-300s, now ~70-90s.

Updated all 9 callers (claims, appraiser facts, block writer, qa
validator, brainstorm, learning loop, style analyzer × 3) to await
the now-async API.

The one-shot scripts/extract_claims_8174.py used to recover 43
appellant claims on case 8174-24 has been moved to .archive/ — phase 1
makes it obsolete. SCRIPTS.md updated.

Phase 2 (background-task wrapper around LLM-bound MCP tools, persistent
llm_tasks table, SSE progress) is the structural follow-up — separate PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 14:21:35 +00:00

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""חילוץ עובדות מובנות משומות שמאי: תכניות חלות והיתרים שניתנו במקרקעין.
תכלית: לבנות את תת-פרק ההיתרים בבלוק ט (תכניות חלות) של ההחלטה, ובמיוחד
לאפשר זיהוי אוטומטי של סתירות בין שמאים שונים על אותו זיהוי (תכנית או היתר).
שמירה ב-DB: טבלת appraiser_facts (case_id, document_id, appraiser_name,
appraiser_side, fact_type, identifier, details JSONB, page_number).
Precondition: כל מסמך doc_type='appraisal' חייב להיות מתויג עם
metadata.appraiser_side מתוך {committee, appellant, deciding}. החילוץ עוצר
ומחזיר status='sides_missing' אם יש מסמכים לא מתויגים.
"""
from __future__ import annotations
import json
import logging
from uuid import UUID
from legal_mcp.services import claude_session, db
logger = logging.getLogger(__name__)
# Allowed sides for an appraiser in an appeals committee case.
# committee = שמאי הוועדה המקומית
# appellant = שמאי העורר / הצד שכנגד הוועדה
# deciding = שמאי מכריע
VALID_APPRAISER_SIDES = {"committee", "appellant", "deciding"}
EXTRACT_FACTS_PROMPT = """אתה מנתח שומות מקרקעין לטובת ועדת ערר לתכנון ובניה.
תפקידך: לחלץ מתוך השומה שתי קטגוריות של עובדות אובייקטיביות שעליהן השמאי מבסס את חוות דעתו:
1. **תכניות חלות** — כל תכנית/תמ"א/תב"ע/תכנית מתאר/תכנית מפורטת שצוינה כתקפה על המקרקעין.
2. **היתרים** — כל היתר בנייה/היתר שימוש/היתר חורג שצוין כאילו ניתן (או שלא ניתן) במקרקעין.
## כללים
- חילוץ עובדתי בלבד — לא לפרש, לא להסיק, לא להעתיק טיעונים משפטיים. רק העובדה היבשה שהשמאי מציין.
- שמור על נאמנות מוחלטת לזיהוי כפי שמופיע במקור (למשל "תמ"א 38" ולא "תמא 38" או "תכנית מתאר ארצית 38").
- אם השמאי מזכיר אותה תכנית/היתר מספר פעמים — החזר רשומה אחת מאוחדת.
- אם יש סתירה פנימית בשומה (השמאי כותב דבר אחד ואז את ההיפך) — שתי רשומות נפרדות.
- ציטוט המקור (raw_quote) חייב להיות העתקה מילולית של המשפט הרלוונטי, עד 200 תווים.
## פלט
החזר JSON array בלבד — ללא markdown, ללא הסברים:
[
{
"fact_type": "plan" | "permit",
"identifier": "תמ\\"א 38" | "היתר 2018/0123",
"details": {
"date": "תאריך אישור/הוצאה אם צוין, אחרת ריק",
"scope": "תיאור היקף/שימוש/זכויות בנייה — בקצרה",
"conditions": "תנאים מיוחדים אם צוינו",
"status": "תקף / פקע / מבוטל / לא צוין",
"raw_quote": "ציטוט מילולי מהשומה"
},
"page_number": null
}
]
אם אין תכניות או היתרים בשומה — החזר [].
"""
def _chunk_text(text: str, max_chars: int = 25000) -> list[str]:
"""Split a long document at paragraph boundaries."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
pos = 0
while pos < len(text):
end = min(pos + max_chars, len(text))
if end < len(text):
break_pos = text.rfind("\n\n", pos, end)
if break_pos > pos + max_chars // 2:
end = break_pos
chunks.append(text[pos:end])
pos = end
return chunks
def _normalize_identifier(identifier: str) -> str:
"""Light normalization so trivial spacing differences don't mask conflicts."""
return " ".join(identifier.strip().split())
async def extract_facts_from_document(
case_id: UUID,
document_id: UUID,
appraiser_name: str,
appraiser_side: str,
text: str,
) -> list[dict]:
"""Extract structured facts from a single appraisal document via Claude Code."""
chunks = _chunk_text(text)
all_facts: list[dict] = []
for i, chunk in enumerate(chunks):
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
prompt = (
f"{EXTRACT_FACTS_PROMPT}\n\n"
f"שמאי: {appraiser_name}{chunk_label}\n\n"
f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---"
)
result = await claude_session.query_json(prompt)
if not isinstance(result, list):
logger.warning(
"extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s",
i, type(result).__name__, document_id,
)
continue
for item in result:
if not isinstance(item, dict):
continue
if item.get("fact_type") not in ("plan", "permit"):
continue
ident = item.get("identifier", "").strip()
if not ident:
continue
all_facts.append({
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"fact_type": item["fact_type"],
"identifier": _normalize_identifier(ident),
"details": item.get("details") or {},
"page_number": item.get("page_number"),
})
await db.replace_appraiser_facts(case_id, document_id, all_facts)
return all_facts
def _doc_metadata(doc: dict) -> dict:
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
return metadata if isinstance(metadata, dict) else {}
def _infer_appraiser_name(doc: dict) -> str:
"""Best-effort extraction of the appraiser's name from document title/metadata."""
meta = _doc_metadata(doc)
name = meta.get("appraiser_name")
if name:
return name
title = doc.get("title", "")
return title or f"שמאי (מסמך {doc.get('id', '')[:8]})"
def _get_appraiser_side(doc: dict) -> str:
"""Return the tagged side, or '' if not tagged."""
return _doc_metadata(doc).get("appraiser_side", "") or ""
def _validate_sides_tagged(appraisals: list[dict]) -> list[dict]:
"""Return the subset of appraisals missing a valid appraiser_side tag."""
missing: list[dict] = []
for doc in appraisals:
side = _get_appraiser_side(doc)
if side not in VALID_APPRAISER_SIDES:
missing.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"current_side": side,
})
return missing
async def extract_appraiser_facts(case_id: UUID) -> dict:
"""Extract facts from every appraisal document in the case + detect conflicts.
Blocks if any appraisal is missing metadata.appraiser_side — the chair must
tag each one via the UI before extraction runs, so that conflict rendering
in block-tet can identify the deciding appraiser's view as authoritative.
Returns a summary dict ready for serialization back to the caller.
"""
docs = await db.list_documents(case_id)
appraisals = [d for d in docs if d.get("doc_type") == "appraisal"]
if not appraisals:
return {
"status": "no_appraisals",
"appraisal_count": 0,
"total_facts": 0,
"conflicts": [],
}
missing_sides = _validate_sides_tagged(appraisals)
if missing_sides:
return {
"status": "sides_missing",
"appraisal_count": len(appraisals),
"missing": missing_sides,
"message": (
"חסר תיוג appraiser_side במסמכי שומה. תייג כל שומה דרך ה-UI "
"(ועדה / עורר / מכריע) והרץ שוב."
),
}
by_doc = []
total_facts = 0
for doc in appraisals:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "no_text",
"facts_extracted": 0,
})
continue
appraiser_name = _infer_appraiser_name(doc)
appraiser_side = _get_appraiser_side(doc)
try:
facts = await extract_facts_from_document(
case_id=case_id,
document_id=UUID(doc["id"]),
appraiser_name=appraiser_name,
appraiser_side=appraiser_side,
text=text,
)
except Exception as e:
logger.exception("Failed to extract facts for document %s", doc["id"])
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "error",
"error": str(e),
"facts_extracted": 0,
})
continue
total_facts += len(facts)
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"status": "completed",
"facts_extracted": len(facts),
"plans": sum(1 for f in facts if f["fact_type"] == "plan"),
"permits": sum(1 for f in facts if f["fact_type"] == "permit"),
})
conflicts = await db.detect_appraiser_conflicts(case_id)
return {
"status": "completed",
"appraisal_count": len(appraisals),
"total_facts": total_facts,
"conflicts": conflicts,
"by_document": by_doc,
}
async def detect_conflicts(case_id: UUID) -> list[dict]:
"""Convenience wrapper around db.detect_appraiser_conflicts."""
return await db.detect_appraiser_conflicts(case_id)