Files
legal-ai/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py
Chaim 3a05e30c8d
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m38s
fix(appraiser-facts): route extraction through analyst wakeup (was silent 0)
The "חלץ עובדות שמאיות" UI button hit POST /api/cases/{n}/extract-appraiser-facts
which called appraiser_facts_extractor inline — that shells out to the local
`claude` CLI, which is absent in the Coolify container, so every doc errored,
the per-doc try/except swallowed it, and the response was "completed, 0 facts".

Refactored the endpoint to wake the legal-analyst of the correct company via
Paperclip (same pattern as wake_curator_for_final), and surface
extraction_failed instead of "completed" when every doc errored.
2026-05-26 11:02:55 +00:00

276 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""חילוץ עובדות מובנות משומות שמאי: תכניות חלות והיתרים שניתנו במקרקעין.
תכלית: לבנות את תת-פרק ההיתרים בבלוק ט (תכניות חלות) של ההחלטה, ובמיוחד
לאפשר זיהוי אוטומטי של סתירות בין שמאים שונים על אותו זיהוי (תכנית או היתר).
שמירה ב-DB: טבלת appraiser_facts (case_id, document_id, appraiser_name,
appraiser_side, fact_type, identifier, details JSONB, page_number).
Precondition: כל מסמך doc_type='appraisal' חייב להיות מתויג עם
metadata.appraiser_side מתוך {committee, appellant, deciding}. החילוץ עוצר
ומחזיר status='sides_missing' אם יש מסמכים לא מתויגים.
"""
from __future__ import annotations
import json
import logging
from uuid import UUID
from legal_mcp.services import claude_session, db
logger = logging.getLogger(__name__)
# Allowed sides for an appraiser in an appeals committee case.
# committee = שמאי הוועדה המקומית
# appellant = שמאי העורר / הצד שכנגד הוועדה
# deciding = שמאי מכריע
VALID_APPRAISER_SIDES = {"committee", "appellant", "deciding"}
EXTRACT_FACTS_PROMPT = """אתה מנתח שומות מקרקעין לטובת ועדת ערר לתכנון ובניה.
תפקידך: לחלץ מתוך השומה שתי קטגוריות של עובדות אובייקטיביות שעליהן השמאי מבסס את חוות דעתו:
1. **תכניות חלות** — כל תכנית/תמ"א/תב"ע/תכנית מתאר/תכנית מפורטת שצוינה כתקפה על המקרקעין.
2. **היתרים** — כל היתר בנייה/היתר שימוש/היתר חורג שצוין כאילו ניתן (או שלא ניתן) במקרקעין.
## כללים
- חילוץ עובדתי בלבד — לא לפרש, לא להסיק, לא להעתיק טיעונים משפטיים. רק העובדה היבשה שהשמאי מציין.
- שמור על נאמנות מוחלטת לזיהוי כפי שמופיע במקור (למשל "תמ"א 38" ולא "תמא 38" או "תכנית מתאר ארצית 38").
- אם השמאי מזכיר אותה תכנית/היתר מספר פעמים — החזר רשומה אחת מאוחדת.
- אם יש סתירה פנימית בשומה (השמאי כותב דבר אחד ואז את ההיפך) — שתי רשומות נפרדות.
- ציטוט המקור (raw_quote) חייב להיות העתקה מילולית של המשפט הרלוונטי, עד 200 תווים.
## פלט
החזר JSON array בלבד — ללא markdown, ללא הסברים:
[
{
"fact_type": "plan" | "permit",
"identifier": "תמ\\"א 38" | "היתר 2018/0123",
"details": {
"date": "תאריך אישור/הוצאה אם צוין, אחרת ריק",
"scope": "תיאור היקף/שימוש/זכויות בנייה — בקצרה",
"conditions": "תנאים מיוחדים אם צוינו",
"status": "תקף / פקע / מבוטל / לא צוין",
"raw_quote": "ציטוט מילולי מהשומה"
},
"page_number": null
}
]
אם אין תכניות או היתרים בשומה — החזר [].
"""
def _chunk_text(text: str, max_chars: int = 25000) -> list[str]:
"""Split a long document at paragraph boundaries."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
pos = 0
while pos < len(text):
end = min(pos + max_chars, len(text))
if end < len(text):
break_pos = text.rfind("\n\n", pos, end)
if break_pos > pos + max_chars // 2:
end = break_pos
chunks.append(text[pos:end])
pos = end
return chunks
def _normalize_identifier(identifier: str) -> str:
"""Light normalization so trivial spacing differences don't mask conflicts."""
return " ".join(identifier.strip().split())
async def extract_facts_from_document(
case_id: UUID,
document_id: UUID,
appraiser_name: str,
appraiser_side: str,
text: str,
) -> list[dict]:
"""Extract structured facts from a single appraisal document via Claude Code."""
chunks = _chunk_text(text)
all_facts: list[dict] = []
for i, chunk in enumerate(chunks):
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
prompt = (
f"{EXTRACT_FACTS_PROMPT}\n\n"
f"שמאי: {appraiser_name}{chunk_label}\n\n"
f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---"
)
result = await claude_session.query_json(prompt)
if not isinstance(result, list):
logger.warning(
"extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s",
i, type(result).__name__, document_id,
)
continue
for item in result:
if not isinstance(item, dict):
continue
if item.get("fact_type") not in ("plan", "permit"):
continue
ident = item.get("identifier", "").strip()
if not ident:
continue
all_facts.append({
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"fact_type": item["fact_type"],
"identifier": _normalize_identifier(ident),
"details": item.get("details") or {},
"page_number": item.get("page_number"),
})
await db.replace_appraiser_facts(case_id, document_id, all_facts)
return all_facts
def _doc_metadata(doc: dict) -> dict:
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
return metadata if isinstance(metadata, dict) else {}
def _infer_appraiser_name(doc: dict) -> str:
"""Best-effort extraction of the appraiser's name from document title/metadata."""
meta = _doc_metadata(doc)
name = meta.get("appraiser_name")
if name:
return name
title = doc.get("title", "")
return title or f"שמאי (מסמך {doc.get('id', '')[:8]})"
def _get_appraiser_side(doc: dict) -> str:
"""Return the tagged side, or '' if not tagged."""
return _doc_metadata(doc).get("appraiser_side", "") or ""
def _validate_sides_tagged(appraisals: list[dict]) -> list[dict]:
"""Return the subset of appraisals missing a valid appraiser_side tag."""
missing: list[dict] = []
for doc in appraisals:
side = _get_appraiser_side(doc)
if side not in VALID_APPRAISER_SIDES:
missing.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"current_side": side,
})
return missing
async def extract_appraiser_facts(case_id: UUID) -> dict:
"""Extract facts from every appraisal document in the case + detect conflicts.
Blocks if any appraisal is missing metadata.appraiser_side — the chair must
tag each one via the UI before extraction runs, so that conflict rendering
in block-tet can identify the deciding appraiser's view as authoritative.
Returns a summary dict ready for serialization back to the caller.
"""
docs = await db.list_documents(case_id)
appraisals = [d for d in docs if d.get("doc_type") == "appraisal"]
if not appraisals:
return {
"status": "no_appraisals",
"appraisal_count": 0,
"total_facts": 0,
"conflicts": [],
}
missing_sides = _validate_sides_tagged(appraisals)
if missing_sides:
return {
"status": "sides_missing",
"appraisal_count": len(appraisals),
"missing": missing_sides,
"message": (
"חסר תיוג appraiser_side במסמכי שומה. תייג כל שומה דרך ה-UI "
"(ועדה / עורר / מכריע) והרץ שוב."
),
}
by_doc = []
total_facts = 0
for doc in appraisals:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "no_text",
"facts_extracted": 0,
})
continue
appraiser_name = _infer_appraiser_name(doc)
appraiser_side = _get_appraiser_side(doc)
try:
facts = await extract_facts_from_document(
case_id=case_id,
document_id=UUID(doc["id"]),
appraiser_name=appraiser_name,
appraiser_side=appraiser_side,
text=text,
)
except Exception as e:
logger.exception("Failed to extract facts for document %s", doc["id"])
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "error",
"error": str(e),
"facts_extracted": 0,
})
continue
total_facts += len(facts)
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"status": "completed",
"facts_extracted": len(facts),
"plans": sum(1 for f in facts if f["fact_type"] == "plan"),
"permits": sum(1 for f in facts if f["fact_type"] == "permit"),
})
conflicts = await db.detect_appraiser_conflicts(case_id)
# Don't swallow extractor failures: if every appraisal errored and no
# facts were extracted, surface that as a distinct status instead of
# the misleading "completed, 0 facts" we used to return — the caller
# (and the UI) need to know that nothing actually ran.
all_errored = (
total_facts == 0
and by_doc
and all(d.get("status") == "error" for d in by_doc)
)
status = "extraction_failed" if all_errored else "completed"
return {
"status": status,
"appraisal_count": len(appraisals),
"total_facts": total_facts,
"conflicts": conflicts,
"by_document": by_doc,
}
async def detect_conflicts(case_id: UUID) -> list[dict]:
"""Convenience wrapper around db.detect_appraiser_conflicts."""
return await db.detect_appraiser_conflicts(case_id)