Files
legal-ai/mcp-server/src/legal_mcp/services/appraiser_facts_extractor.py
Chaim c619c22a51
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
Add pre-ruling interim draft (טיוטת ביניים) for appeals committee
Lets the chair generate a partial decision DOCX before the discussion-and-
ruling block is decided. Same template, skill and DOCX styling as the final
decision (David, RTL, bookmarks) — only the block selection and order differ:
רקע (ו) → תכניות+היתרים (ט) → טענות (ז) → הליכים (ח). The opening (ה),
ruling (י), summary (יא), and signatures (יב) are omitted.

- New appraiser_facts table + CRUD + conflict detection in db.py (V5 schema).
  Conflict = same plan/permit identifier reported differently by 2+ appraisers.
- New appraiser_facts_extractor service: per-appraisal Claude extraction of
  plans + permits with raw quotes and page numbers.
- block-tet prompt extended with a permits sub-section sourced from the
  extracted facts, plus an explicit instruction to flag inter-appraiser
  conflicts in neutral wording without resolving them (deferred to block-yod).
- block-chet prompt extended with a post-hearing materials context sourced
  from documents.metadata.is_post_hearing.
- docx_exporter.export_decision now accepts mode='interim' which reorders
  the blocks per the chair's mental model and writes
  טיוטת-ביניים-v{N}.docx (versioned independently of regular drafts).
- 3 new MCP tools: extract_appraiser_facts, write_interim_draft,
  export_interim_draft. write_interim_draft auto-runs extraction if the
  appraiser_facts table is empty for the case.
2026-04-18 13:28:04 +00:00

206 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""חילוץ עובדות מובנות משומות שמאי: תכניות חלות והיתרים שניתנו במקרקעין.
תכלית: לבנות את תת-פרק ההיתרים בבלוק ט (תכניות חלות) של ההחלטה, ובמיוחד
לאפשר זיהוי אוטומטי של סתירות בין שמאים שונים על אותו זיהוי (תכנית או היתר).
שמירה ב-DB: טבלת appraiser_facts (case_id, document_id, appraiser_name,
fact_type, identifier, details JSONB, page_number).
"""
from __future__ import annotations
import logging
from uuid import UUID
from legal_mcp.services import claude_session, db
logger = logging.getLogger(__name__)
EXTRACT_FACTS_PROMPT = """אתה מנתח שומות מקרקעין לטובת ועדת ערר לתכנון ובניה.
תפקידך: לחלץ מתוך השומה שתי קטגוריות של עובדות אובייקטיביות שעליהן השמאי מבסס את חוות דעתו:
1. **תכניות חלות** — כל תכנית/תמ"א/תב"ע/תכנית מתאר/תכנית מפורטת שצוינה כתקפה על המקרקעין.
2. **היתרים** — כל היתר בנייה/היתר שימוש/היתר חורג שצוין כאילו ניתן (או שלא ניתן) במקרקעין.
## כללים
- חילוץ עובדתי בלבד — לא לפרש, לא להסיק, לא להעתיק טיעונים משפטיים. רק העובדה היבשה שהשמאי מציין.
- שמור על נאמנות מוחלטת לזיהוי כפי שמופיע במקור (למשל "תמ"א 38" ולא "תמא 38" או "תכנית מתאר ארצית 38").
- אם השמאי מזכיר אותה תכנית/היתר מספר פעמים — החזר רשומה אחת מאוחדת.
- אם יש סתירה פנימית בשומה (השמאי כותב דבר אחד ואז את ההיפך) — שתי רשומות נפרדות.
- ציטוט המקור (raw_quote) חייב להיות העתקה מילולית של המשפט הרלוונטי, עד 200 תווים.
## פלט
החזר JSON array בלבד — ללא markdown, ללא הסברים:
[
{
"fact_type": "plan" | "permit",
"identifier": "תמ\\"א 38" | "היתר 2018/0123",
"details": {
"date": "תאריך אישור/הוצאה אם צוין, אחרת ריק",
"scope": "תיאור היקף/שימוש/זכויות בנייה — בקצרה",
"conditions": "תנאים מיוחדים אם צוינו",
"status": "תקף / פקע / מבוטל / לא צוין",
"raw_quote": "ציטוט מילולי מהשומה"
},
"page_number": null
}
]
אם אין תכניות או היתרים בשומה — החזר [].
"""
def _chunk_text(text: str, max_chars: int = 25000) -> list[str]:
"""Split a long document at paragraph boundaries."""
if len(text) <= max_chars:
return [text]
chunks: list[str] = []
pos = 0
while pos < len(text):
end = min(pos + max_chars, len(text))
if end < len(text):
break_pos = text.rfind("\n\n", pos, end)
if break_pos > pos + max_chars // 2:
end = break_pos
chunks.append(text[pos:end])
pos = end
return chunks
def _normalize_identifier(identifier: str) -> str:
"""Light normalization so trivial spacing differences don't mask conflicts."""
return " ".join(identifier.strip().split())
async def extract_facts_from_document(
case_id: UUID,
document_id: UUID,
appraiser_name: str,
text: str,
) -> list[dict]:
"""Extract structured facts from a single appraisal document via Claude Code."""
chunks = _chunk_text(text)
all_facts: list[dict] = []
for i, chunk in enumerate(chunks):
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
prompt = (
f"{EXTRACT_FACTS_PROMPT}\n\n"
f"שמאי: {appraiser_name}{chunk_label}\n\n"
f"--- תחילת שומה ---\n{chunk}\n--- סוף שומה ---"
)
result = claude_session.query_json(prompt, timeout=180)
if not isinstance(result, list):
logger.warning(
"extract_facts_from_document: chunk %d returned non-list (%s) for doc=%s",
i, type(result).__name__, document_id,
)
continue
for item in result:
if not isinstance(item, dict):
continue
if item.get("fact_type") not in ("plan", "permit"):
continue
ident = item.get("identifier", "").strip()
if not ident:
continue
all_facts.append({
"appraiser_name": appraiser_name,
"fact_type": item["fact_type"],
"identifier": _normalize_identifier(ident),
"details": item.get("details") or {},
"page_number": item.get("page_number"),
})
if all_facts:
await db.replace_appraiser_facts(case_id, document_id, all_facts)
else:
await db.replace_appraiser_facts(case_id, document_id, [])
return all_facts
def _infer_appraiser_name(doc: dict) -> str:
"""Best-effort extraction of the appraiser's name from document title/metadata."""
metadata = doc.get("metadata") or {}
name = metadata.get("appraiser_name") if isinstance(metadata, dict) else None
if name:
return name
title = doc.get("title", "")
return title or f"שמאי (מסמך {doc.get('id', '')[:8]})"
async def extract_appraiser_facts(case_id: UUID) -> dict:
"""Extract facts from every appraisal document in the case + detect conflicts.
Returns a summary dict ready for serialization back to the caller.
"""
docs = await db.list_documents(case_id)
appraisals = [d for d in docs if d.get("doc_type") == "appraisal"]
if not appraisals:
return {
"status": "no_appraisals",
"appraisal_count": 0,
"total_facts": 0,
"conflicts": [],
}
by_doc = []
total_facts = 0
for doc in appraisals:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "no_text",
"facts_extracted": 0,
})
continue
appraiser_name = _infer_appraiser_name(doc)
try:
facts = await extract_facts_from_document(
case_id=case_id,
document_id=UUID(doc["id"]),
appraiser_name=appraiser_name,
text=text,
)
except Exception as e:
logger.exception("Failed to extract facts for document %s", doc["id"])
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"status": "error",
"error": str(e),
"facts_extracted": 0,
})
continue
total_facts += len(facts)
by_doc.append({
"document_id": doc["id"],
"title": doc.get("title", ""),
"appraiser_name": appraiser_name,
"status": "completed",
"facts_extracted": len(facts),
"plans": sum(1 for f in facts if f["fact_type"] == "plan"),
"permits": sum(1 for f in facts if f["fact_type"] == "permit"),
})
conflicts = await db.detect_appraiser_conflicts(case_id)
return {
"status": "completed",
"appraisal_count": len(appraisals),
"total_facts": total_facts,
"conflicts": conflicts,
"by_document": by_doc,
}
async def detect_conflicts(case_id: UUID) -> list[dict]:
"""Convenience wrapper around db.detect_appraiser_conflicts."""
return await db.detect_appraiser_conflicts(case_id)