Files
legal-ai/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py
Chaim c536ed0e63
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
Edit document doc_type and appraiser side from the case UI
Until now changing a document's doc_type required a manual SQL update.
Adds an inline editor on the document badge so the chair can retag
without leaving the case page, and threads an appraiser_side tag
(committee / appellant / deciding) through the appraisal pipeline so
betterment-levy cases — which usually have 2-3 appraisers — render
conflicts with the deciding appraiser's view marked as governing.

Backend
- New appraiser_facts.appraiser_side column (V5.1) populated from
  documents.metadata.appraiser_side at extraction time.
- extract_appraiser_facts now returns status='sides_missing' with the
  list of untagged appraisals instead of running with empty side
  labels — chair must tag every appraisal first via the UI.
- Conflict detection orders entries committee → appellant → deciding so
  the deciding appraiser appears last; block-tet's prompt instructs the
  writer to phrase the deciding appraiser's view as the governing
  factual finding ("ואולם, השמאי המכריע קבע...").
- New PATCH /api/cases/{n}/documents/{doc_id} (Pydantic model with
  whitelist validation) and matching document_update MCP tool. Both
  merge appraiser_side into metadata JSONB instead of touching the
  schema.

UI
- New shared doc-types module exports the canonical 11 doc_type
  options plus the 3 appraiser-side options; both upload-sheet and
  the document badge now read from it instead of duplicating Hebrew
  labels.
- New DocumentTypeEditor renders a Popover off the doc-type Badge
  with two Selects. The save button stays disabled while doc_type is
  appraisal but no side has been picked, mirroring the backend
  enforcement so the user finds out before triggering extraction.
- usePatchDocument React-Query mutation invalidates the case detail
  on success so the badge updates without a manual refresh.
2026-04-19 06:26:51 +00:00

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""חילוץ עובדות מובנות משומות שמאי: תכניות חלות והיתרים שניתנו במקרקעין.
תכלית: לבנות את תת-פרק ההיתרים בבלוק ט (תכניות חלות) של ההחלטה, ובמיוחד
לאפשר זיהוי אוטומטי של סתירות בין שמאים שונים על אותו זיהוי (תכנית או היתר).
שמירה ב-DB: טבלת appraiser_facts (case_id, document_id, appraiser_name,
appraiser_side, fact_type, identifier, details JSONB, page_number).
Precondition: כל מסמך doc_type='appraisal' חייב להיות מתויג עם
metadata.appraiser_side מתוך {committee, appellant, deciding}. החילוץ עוצר
ומחזיר status='sides_missing' אם יש מסמכים לא מתויגים.
"""
from __future__ import annotations
import json
import logging
from uuid import UUID
from legal_mcp.services import claude_session, db
logger = logging.getLogger(__name__)
# Allowed sides for an appraiser in an appeals committee case.
# committee = שמאי הוועדה המקומית
# appellant = שמאי העורר / הצד שכנגד הוועדה
# deciding = שמאי מכריע
VALID_APPRAISER_SIDES = {"committee", "appellant", "deciding"}
EXTRACT_FACTS_PROMPT = """אתה מנתח שומות מקרקעין לטובת ועדת ערר לתכנון ובניה.
תפקידך: לחלץ מתוך השומה שתי קטגוריות של עובדות אובייקטיביות שעליהן השמאי מבסס את חוות דעתו:
1. **תכניות חלות** — כל תכנית/תמ"א/תב"ע/תכנית מתאר/תכנית מפורטת שצוינה כתקפה על המקרקעין.
2. **היתרים** — כל היתר בנייה/היתר שימוש/היתר חורג שצוין כאילו ניתן (או שלא ניתן) במקרקעין.
## כללים
- חילוץ עובדתי בלבד — לא לפרש, לא להסיק, לא להעתיק טיעונים משפטיים. רק העובדה היבשה שהשמאי מציין.
- שמור על נאמנות מוחלטת לזיהוי כפי שמופיע במקור (למשל "תמ"א 38" ולא "תמא 38" או "תכנית מתאר ארצית 38").
- אם השמאי מזכיר אותה תכנית/היתר מספר פעמים — החזר רשומה אחת מאוחדת.
- אם יש סתירה פנימית בשומה (השמאי כותב דבר אחד ואז את ההיפך) — שתי רשומות נפרדות.
- ציטוט המקור (raw_quote) חייב להיות העתקה מילולית של המשפט הרלוונטי, עד 200 תווים.
## פלט
החזר JSON array בלבד — ללא markdown, ללא הסברים:
[
{
"fact_type": "plan" | "permit",
"identifier": "תמ\\"א 38" | "היתר 2018/0123",
"details": {
"date": "תאריך אישור/הוצאה אם צוין, אחרת ריק",
"scope": "תיאור היקף/שימוש/זכויות בנייה — בקצרה",
"conditions": "תנאים מיוחדים אם צוינו",
"status": "תקף / פקע / מבוטל / לא צוין",
"raw_quote": "ציטוט מילולי מהשומה"
},
"page_number": null
}
]
אם אין תכניות או היתרים בשומה — החזר [].
"""
def _chunk_text(text: str, max_chars: int = 25000) -> list[str]:
"""Split a long document at paragraph boundaries."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
pos = 0
while pos < len(text):
end = min(pos + max_chars, len(text))
if end < len(text):
break_pos = text.rfind("\n\n", pos, end)
if break_pos > pos + max_chars // 2:
end = break_pos
chunks.append(text[pos:end])
pos = end
return chunks
def _normalize_identifier(identifier: str) -> str:
"""Light normalization so trivial spacing differences don't mask conflicts."""
return " ".join(identifier.strip().split())
async def extract_facts_from_document(
case_id: UUID,
document_id: UUID,
appraiser_name: str,
appraiser_side: str,
text: str,
) -> list[dict]:
"""Extract structured facts from a single appraisal document via Claude Code."""
chunks = _chunk_text(text)
all_facts: list[dict] = []
for i, chunk in enumerate(chunks):
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
prompt = (
f"{EXTRACT_FACTS_PROMPT}\n\n"
f"שמאי: {appraiser_name}{chunk_label}\n\n"
f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---"
)
result = claude_session.query_json(prompt, timeout=180)
if not isinstance(result, list):
logger.warning(
"extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s",
i, type(result).__name__, document_id,
)
continue
for item in result:
if not isinstance(item, dict):
continue
if item.get("fact_type") not in ("plan", "permit"):
continue
ident = item.get("identifier", "").strip()
if not ident:
continue
all_facts.append({
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"fact_type": item["fact_type"],
"identifier": _normalize_identifier(ident),
"details": item.get("details") or {},
"page_number": item.get("page_number"),
})
await db.replace_appraiser_facts(case_id, document_id, all_facts)
return all_facts
def _doc_metadata(doc: dict) -> dict:
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
metadata = {}
return metadata if isinstance(metadata, dict) else {}
def _infer_appraiser_name(doc: dict) -> str:
"""Best-effort extraction of the appraiser's name from document title/metadata."""
meta = _doc_metadata(doc)
name = meta.get("appraiser_name")
if name:
return name
title = doc.get("title", "")
return title or f"שמאי (מסמך {doc.get('id', '')[:8]})"
def _get_appraiser_side(doc: dict) -> str:
"""Return the tagged side, or '' if not tagged."""
return _doc_metadata(doc).get("appraiser_side", "") or ""
def _validate_sides_tagged(appraisals: list[dict]) -> list[dict]:
"""Return the subset of appraisals missing a valid appraiser_side tag."""
missing: list[dict] = []
for doc in appraisals:
side = _get_appraiser_side(doc)
if side not in VALID_APPRAISER_SIDES:
missing.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"current_side": side,
})
return missing
async def extract_appraiser_facts(case_id: UUID) -> dict:
"""Extract facts from every appraisal document in the case + detect conflicts.
Blocks if any appraisal is missing metadata.appraiser_side — the chair must
tag each one via the UI before extraction runs, so that conflict rendering
in block-tet can identify the deciding appraiser's view as authoritative.
Returns a summary dict ready for serialization back to the caller.
"""
docs = await db.list_documents(case_id)
appraisals = [d for d in docs if d.get("doc_type") == "appraisal"]
if not appraisals:
return {
"status": "no_appraisals",
"appraisal_count": 0,
"total_facts": 0,
"conflicts": [],
}
missing_sides = _validate_sides_tagged(appraisals)
if missing_sides:
return {
"status": "sides_missing",
"appraisal_count": len(appraisals),
"missing": missing_sides,
"message": (
"חסר תיוג appraiser_side במסמכי שומה. תייג כל שומה דרך ה-UI "
"(ועדה / עורר / מכריע) והרץ שוב."
),
}
by_doc = []
total_facts = 0
for doc in appraisals:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "no_text",
"facts_extracted": 0,
})
continue
appraiser_name = _infer_appraiser_name(doc)
appraiser_side = _get_appraiser_side(doc)
try:
facts = await extract_facts_from_document(
case_id=case_id,
document_id=UUID(doc["id"]),
appraiser_name=appraiser_name,
appraiser_side=appraiser_side,
text=text,
)
except Exception as e:
logger.exception("Failed to extract facts for document %s", doc["id"])
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "error",
"error": str(e),
"facts_extracted": 0,
})
continue
total_facts += len(facts)
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"appraiser_name": appraiser_name,
"appraiser_side": appraiser_side,
"status": "completed",
"facts_extracted": len(facts),
"plans": sum(1 for f in facts if f["fact_type"] == "plan"),
"permits": sum(1 for f in facts if f["fact_type"] == "permit"),
})
conflicts = await db.detect_appraiser_conflicts(case_id)
return {
"status": "completed",
"appraisal_count": len(appraisals),
"total_facts": total_facts,
"conflicts": conflicts,
"by_document": by_doc,
}
async def detect_conflicts(case_id: UUID) -> list[dict]:
"""Convenience wrapper around db.detect_appraiser_conflicts."""
return await db.detect_appraiser_conflicts(case_id)