Files
legal-ai/mcp-server/src/legal_mcp/services/block_writer.py

1094 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""מנוע כתיבת בלוקים להחלטת ועדת ערר.
מייצר טקסט בפועל לכל בלוק (ה-יב) בהתבסס על:
- block-schema.md (פרמטרים, constraints, מבנה)
- SKILL.md (סגנון דפנה)
- חומרי המקור (מסמכים, טענות, פסיקה)
- מסמך כיוון (חובה לבלוק י)
בלוקים א-ד ויב = template-fill (ללא AI).
בלוקים ה-יא = AI generation עם Claude.
"""
from __future__ import annotations
import json
import logging
import re
from datetime import date
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, embeddings, claude_session, audit
from legal_mcp.services.lessons import get_content_checklist, get_methodology_summary
logger = logging.getLogger(__name__)
# ── Block configuration ───────────────────────────────────────────
# Output token limits per Anthropic docs:
# Opus 4.7: up to 128K output tokens (new tokenizer — ~35% more tokens)
# Sonnet 4.6: up to 64K output tokens
# Streaming required when max_tokens > 21,333
BLOCK_CONFIG = {
"block-alef": {"index": 1, "title": "כותרת מוסדית", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-bet": {"index": 2, "title": "הרכב הוועדה", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-gimel":{"index": 3, "title": "צדדים", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-dalet":{"index": 4, "title": "החלטה", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-he": {"index": 5, "title": "פתיחה", "gen_type": "paraphrase", "temp": 0.2, "model": "sonnet", "max_tokens": 4096},
"block-vav": {"index": 6, "title": "רקע עובדתי", "gen_type": "reproduction", "temp": 0, "model": "sonnet", "max_tokens": 16384},
"block-zayin":{"index": 7, "title": "טענות הצדדים", "gen_type": "paraphrase", "temp": 0.1, "model": "sonnet", "max_tokens": 16384},
"block-chet": {"index": 8, "title": "הליכים", "gen_type": "reproduction", "temp": 0, "model": "sonnet", "max_tokens": 8192},
"block-tet": {"index": 9, "title": "תכניות חלות", "gen_type": "guided-synthesis", "temp": 0.2, "model": "opus", "max_tokens": 16384},
"block-yod": {"index": 10, "title": "דיון והכרעה", "gen_type": "rhetorical-construction", "temp": 0.4, "model": "opus", "max_tokens": 16384},
"block-yod-alef": {"index": 11, "title": "סיכום", "gen_type": "paraphrase", "temp": 0.1, "model": "sonnet", "max_tokens": 8192},
"block-yod-bet": {"index": 12, "title": "חתימות", "gen_type": "template-fill", "temp": 0, "model": "script"},
}
MODEL_MAP = {
"sonnet": "claude-sonnet-4-20250514",
"opus": "claude-opus-4-7",
}
# ── Template blocks (א-ד, יב) ────────────────────────────────────
def write_block_alef(case: dict, decision: dict | None = None) -> str:
"""כותרת מוסדית."""
return f"""מדינת ישראל
ועדת ערר לתכנון ולבנייה — מחוז ירושלים
ערר מס' {case['case_number']}"""
def write_block_bet(case: dict, decision: dict | None = None) -> str:
"""הרכב הוועדה."""
panel = (decision or {}).get("panel_members", [])
members_text = ""
if panel:
for m in panel:
members_text += f"\n{m}"
return f"""בפני:
עו"ד דפנה תמיר, יו"ר{members_text}"""
def write_block_gimel(case: dict, decision: dict | None = None) -> str:
"""צדדים."""
appellants = "\n".join(case.get("appellants", ["(לא צוין)"]))
respondents = "\n".join(case.get("respondents", ["(לא צוין)"]))
return f"""{appellants}
נגד
{respondents}"""
def write_block_dalet(case: dict, decision: dict | None = None) -> str:
"""כותרת החלטה."""
return "החלטה"
def write_block_yod_bet(case: dict, decision: dict | None = None) -> str:
"""חתימות."""
today = date.today().strftime("%d.%m.%Y")
return f"""ניתנה היום, {today}, פה אחד.
דפנה תמיר, עו"ד
יו"ר ועדת הערר"""
TEMPLATE_WRITERS = {
"block-alef": write_block_alef,
"block-bet": write_block_bet,
"block-gimel": write_block_gimel,
"block-dalet": write_block_dalet,
"block-yod-bet": write_block_yod_bet,
}
# ── AI-generated blocks (ה-יא) ───────────────────────────────────
BLOCK_PROMPTS = {
"block-he": """כתוב את בלוק הפתיחה (בלוק ה) של החלטת ועדת ערר.
## כללים:
- פתח ב"לפנינו ערר..." או "עניינה של החלטה זו..."
- הגדר "להלן" מרכזיים: הוועדה המקומית, התכנית/הבקשה, המגרש
- 1-2 סעיפים בלבד
- אין ניתוח, אין ערכי שיפוט, אין ציטוטים מצדדים
- מספור: 1.
## פרטי התיק:
{case_context}
## חומרי מקור:
{source_context}""",
"block-vav": """כתוב את בלוק הרקע העובדתי (בלוק ו, "פתח דבר") של החלטת ועדת ערר.
## כללים קריטיים:
- **רקע ניטרלי** — עובדות בלבד. אין ציטוטים ישירים מצדדים. אין מילות ערך/שיפוט ("חריג", "חטא", "בעייתי").
- סדר פנימי: מקרקעין → סביבה → היסטוריה תכנונית → מהות הבקשה → החלטת הוועדה → הגשת הערר
- סמן מיקומי תמונות: [📷 מיקום GIS], [📷 תשריט]
- ציטוט מפרוטוקול ועדה מקומית (אם יש) כ-blockquote
- מספור רציף מהבלוק הקודם
## פרטי התיק:
{case_context}
## חומרי מקור:
{source_context}""",
"block-zayin": """כתוב את בלוק טענות הצדדים (בלוק ז, "תמצית טענות הצדדים") של החלטת ועדת ערר.
## כללים קריטיים:
- **סנתז טענות דומות** — אל תרשום כל טענה בנפרד. קבץ טענות דומות לנושא אחד. למשל: כל הטענות על הודעות → סעיף אחד, כל הטענות על רכוש משותף → סעיף אחד.
- גוף שלישי: "העוררים טוענים כי...", "הוועדה המקומית ציינה כי..."
- **מבנה קבוע עם 3 חלקים:**
1. "טענות העוררים" — 8-12 סעיפים מקובצים לפי נושא
2. "עמדת הוועדה המקומית" — 5-8 סעיפים
3. "עמדת מבקשי ההיתר" (אם יש) — 5-10 סעיפים
- כותרת: "תמצית טענות הצדדים"
- נאמנות למקור — לא להמציא טענות, אבל כן לאחד ולסכם טענות חוזרות
- אין ניתוח, אין מסקנות, אין הערכה ("טענה חלשה/חזקה")
- רק מכתבי טענות מקוריים (לא השלמות טיעון)
- מספור רציף
- **יעד אורך: 800-1500 מילים**
## טענות שחולצו (קבץ טענות דומות לנושאים):
{claims_context}
## פרטי התיק:
{case_context}""",
"block-chet": """כתוב את בלוק ההליכים (בלוק ח, "ההליכים בפני ועדת הערר") של החלטת ועדת ערר.
## כללים:
- תיעוד כרונולוגי: דיון → סיור → השלמות טיעון → משא-ומתן לפשרה (אם היה) → החלטות ביניים
- תאריכים מדויקים
- אם בדיון עלו נקודות חדשות או הובהרו סוגיות משפטיות — ציין זאת במפורש בסעיף נפרד
- תוכן כל השלמת טיעון/הצעת פשרה בסעיף נפרד עם תאריך
- סמן תמונות מסיור: [📷 צילום מסיור]
- אין ניתוח או הערכה
- מספור רציף
## פרטי התיק:
{case_context}
## מסמכים שהוגשו לאחר הדיון (אם יש):
{post_hearing_context}
## חומרי מקור:
{source_context}""",
"block-tet": """כתוב את בלוק התכניות החלות (בלוק ט) של החלטת ועדת ערר, **כולל תת-פרק היתרים**.
## מבנה נדרש:
1. **תכניות חלות** — מבנה הירכי: תכניות ארציות → מחוזיות → מקומיות. ציטוט ישיר מהוראות תכנית עם **הדגשה** של מילים מכריעות.
2. **תת-פרק היתרים** — כותרת משנה "היתרים" (או "היתרי בנייה שניתנו במקרקעין"). פירוט ההיתרים הרלוונטיים על פי השומות שהוגשו לתיק.
## כללי ציון סתירות בין שמאים (קריטי):
- אם שני שמאים או יותר מסרו מידע שונה על אותה תכנית או היתר — חובה לסמן זאת במפורש בנוסח ניטרלי, למשל:
> "יצוין כי שמאי הוועדה ציין כי תכנית פלונית חלה על המקרקעין במלואה, בעוד שמאי העורר סבר כי חלקה של התכנית בלבד חל"
- **כשקיים שמאי מכריע** — השומה שלו היא הקובעת עובדתית. סמן זאת במפורש בסוף הדיון בסתירה, בנוסח: "ואולם, השמאי המכריע קבע כי..." או "השמאי המכריע, שבחן את עמדות הצדדים, הכריע כי...". הצג את עמדת המכריע **אחרונה** כדי שההקשר יבנה אליה.
- השתמש בתוויות הצד המדויקות: "שמאי הוועדה המקומית", "שמאי העורר", "שמאי מכריע" — ולא בשמות פרטיים אלא אם נדרש לבהירות.
- אין להכריע בסתירה משפטית או להגיע למסקנה נורמטיבית בבלוק זה — ההכרעה המשפטית (אם נדרשת) תבוא בבלוק י. כאן מציגים רק את הממצא העובדתי כפי שהוא, כולל הכרעת המכריע העובדתית.
- אם אין סתירה — אין להזכיר זאת.
## כללים נוספים:
- אין ניתוח מעמיק (→ בלוק י), אין הכרעה בין פרשנויות
- מספור רציף
- אם אין שומות בתיק — דווח רק על תכניות שזוהו ממסמכים אחרים, וציין במשפט אחד שלא הוגשו שומות
## פרטי התיק:
{case_context}
## תכניות שזוהו (ממטא-דאטה של מסמכים):
{plans_context}
## עובדות שמאיות שחולצו (תכניות + היתרים, פרק לכל שמאי):
{appraiser_facts_context}
## סתירות שזוהו בין שמאים (חובה לסמן בנוסח):
{appraiser_conflicts_context}
## חומרי מקור:
{source_context}""",
"block-yod": """כתוב את בלוק הדיון וההכרעה (בלוק י) של החלטת ועדת ערר.
## זהו הבלוק הקריטי ביותר — ליבת ההחלטה (ratio decidendi).
## אורך נדרש: **2,000-4,000 מילים לפחות**. זהו הבלוק הארוך ביותר בהחלטה (35-50%).
{methodology_guidance}
{content_checklist}
## כללים נוספים:
- **ללא כפילות** — הפנה לבלוקים קודמים: "כאמור בסעיף X לעיל"
- **מספור רציף** — המשך מספור מהבלוק הקודם
- מותרות כותרות-משנה כשיש נושאים נפרדים לחלוטין
## כיוון מאושר (חובה):
{direction_context}
## מבנה לפי תוצאה:
{structure_guidance}
## טענות:
{claims_context}
## חומרי מקור:
{source_context}
## פסיקה רלוונטית (צטט מכאן ומהידע הכללי שלך):
{precedents_context}
## סגנון דפנה:
{style_context}""",
"block-yod-alef": """כתוב את בלוק הסיכום (בלוק יא, "סוף דבר") של החלטת ועדת ערר.
## כללים:
- כותרת: "סוף דבר" או "סיכום"
- תוצאה ברורה: "הערר נדחה" / "הערר מתקבל" / "הערר מתקבל באופן חלקי"
- הוראות אופרטיביות חד-משמעיות
- אין חזרה על נימוקים — ההנמקה כבר בדיון
- מספור רציף
## מבנה לפי תוצאה:
- דחייה: "הערר נדחה" + תתי-סעיפים + פסקה חמה (רישוי בלבד)
- קבלה: "הערר מתקבל בכפוף ל..." + פרוזה
- קבלה חלקית: "הערר מתקבל באופן חלקי" + 2-3 הוראות אופרטיביות
## כיוון ותוצאה:
{direction_context}
## בלוקים קודמים (דיון):
{discussion_context}""",
}
# Discussion structure by outcome
STRUCTURE_GUIDANCE = {
"rejected": "דחייה — שכבות הגנה (concentric circles): טענה ראשית → נדחית, טענה חלופית → נדחית, חיזוק.",
"accepted": "קבלה — נימוק-נימוק: כל נימוק = CREAC מלא, בניית שכנוע הדרגתי.",
"partial": "קבלה חלקית — מיפוי מתחים: מה מתקבל ולמה, מה נדחה ולמה, איזון.",
}
async def write_block(
case_id: UUID,
block_id: str,
instructions: str = "",
) -> dict:
"""כתיבת בלוק יחיד בהחלטה.
Args:
case_id: מזהה התיק
block_id: מזהה הבלוק (block-alef, block-he, block-yod, ...)
instructions: הנחיות נוספות
Returns:
dict עם content, word_count, block_id, generation_type
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
# Template blocks
if block_id in TEMPLATE_WRITERS:
content = TEMPLATE_WRITERS[block_id](case, decision)
r = _build_result(block_id, content, block_cfg)
r["sources"] = {"document_ids": [], "claim_ids": [], "case_law_ids": []}
return r
# AI-generated blocks
prompt_template = BLOCK_PROMPTS.get(block_id)
if not prompt_template:
raise ValueError(f"No prompt template for {block_id}")
# Build context components
case_context = _build_case_context(case, decision)
source_context = await _build_source_context(case_id, block_id)
claims_context = await _build_claims_context(case_id)
direction_context = _build_direction_context(decision)
plans_context = await _build_plans_context(case_id)
precedents_context, _precedent_case_law_ids = await _build_precedents_context(case_id, block_id)
style_context = await _build_style_context()
discussion_context = await _build_previous_blocks_context(case_id, decision)
appraiser_facts_context = await _build_appraiser_facts_context(case_id)
appraiser_conflicts_context = await _build_appraiser_conflicts_context(case_id)
post_hearing_context = await _build_post_hearing_context(case_id)
outcome = (decision or {}).get("outcome", "rejected")
structure_guidance = STRUCTURE_GUIDANCE.get(outcome, "")
# Content checklist — tells block-yod WHAT topics to cover
content_checklist = ""
methodology_guidance = ""
if block_id == "block-yod":
content_checklist = get_content_checklist(
appeal_type=case.get("appeal_type", ""),
subject=case.get("subject", ""),
subject_categories=case.get("subject_categories", []),
)
# Methodology guidance — tells block-yod HOW to reason (universal, not case-specific)
methodology_guidance = get_methodology_summary()
# Format prompt — per Anthropic long-context best practices:
# Place source documents FIRST (top of prompt), instructions LAST.
# "Queries at the end can improve response quality by up to 30%"
formatted_prompt = prompt_template.format(
case_context=case_context,
source_context=source_context,
claims_context=claims_context,
direction_context=direction_context,
plans_context=plans_context,
precedents_context=precedents_context,
style_context=style_context,
discussion_context=discussion_context,
structure_guidance=structure_guidance,
content_checklist=content_checklist,
methodology_guidance=methodology_guidance,
appraiser_facts_context=appraiser_facts_context,
appraiser_conflicts_context=appraiser_conflicts_context,
post_hearing_context=post_hearing_context,
)
# source_context is already embedded inside formatted_prompt via {source_context} in the
# template. Do NOT prepend it again — doing so doubles the prompt size (was 465K chars).
prompt = formatted_prompt
if instructions:
prompt += f"\n\n## הנחיות נוספות:\n{instructions}"
# Block י requires approved direction
if block_id == "block-yod":
dir_doc = (decision or {}).get("direction_doc") or {}
if not dir_doc.get("approved"):
raise ValueError("לא ניתן לכתוב בלוק דיון ללא כיוון מאושר. הפעל brainstorm → approve_direction קודם.")
# Guard against context overflow before calling claude -p.
# Sonnet: 200K context → ~800K chars max; Opus: 200K context → same.
# In practice the CLI has crashed on prompts above ~400K chars, so use
# that as a conservative ceiling (well below the token limit).
_MAX_PROMPT_CHARS = 400_000
if len(prompt) > _MAX_PROMPT_CHARS:
raise RuntimeError(
f"Prompt too large for {block_id}: {len(prompt):,} chars "
f"(limit {_MAX_PROMPT_CHARS:,}). "
f"source_context: {len(source_context):,} chars. "
f"Reduce documents or call extract_appraiser_facts first."
)
# Call Claude via Claude Code session (no API)
model_key = block_cfg["model"]
timeout = claude_session.LONG_TIMEOUT if model_key == "opus" else claude_session.DEFAULT_TIMEOUT
content = await claude_session.query(prompt, timeout=timeout)
sources = await _collect_block_sources(case_id, block_id)
sources["case_law_ids"] = _precedent_case_law_ids
result = _build_result(block_id, content, block_cfg)
result["sources"] = sources
return result
def _build_result(block_id: str, content: str, block_cfg: dict) -> dict:
word_count = len(content.split())
return {
"block_id": block_id,
"block_index": block_cfg["index"],
"title": block_cfg["title"],
"content": content,
"word_count": word_count,
"generation_type": block_cfg["gen_type"],
"model_used": block_cfg["model"],
"temperature": block_cfg["temp"],
}
async def _collect_block_sources(case_id: UUID, block_id: str) -> dict:
"""Deterministic source ids available to a block's generation (GAP-19).
document_ids: case documents matching the block's allowed doc-types.
claim_ids: extracted claims for the case. (case_law_ids are captured
separately from the precedent search inside write_block.)
"""
allowed = _BLOCK_DOC_TYPES.get(block_id, []) # [] = all docs; None = no source docs
if allowed is None:
docs = [] # mirror _build_source_context: this block consumes no raw source docs
else:
docs = await db.list_documents(case_id)
if allowed:
docs = [d for d in docs if d.get("doc_type") in allowed]
claims = await db.get_claims(case_id)
return {
"document_ids": [str(d["id"]) for d in docs],
"claim_ids": [str(c["id"]) for c in claims],
}
# ── Context builders ──────────────────────────────────────────────
def _build_case_context(case: dict, decision: dict | None) -> str:
outcome = (decision or {}).get("outcome", "")
outcome_heb = {"rejected": "דחייה", "accepted": "קבלה", "partial": "קבלה חלקית"}.get(outcome, "")
return f"""- מספר תיק: {case['case_number']}
- כותרת: {case.get('title', '')}
- עוררים: {', '.join(case.get('appellants', []))}
- משיבים: {', '.join(case.get('respondents', []))}
- נושא: {case.get('subject', '')}
- כתובת: {case.get('property_address', '')}
- סוג ערר: {case.get('appeal_type', '')}
- תוצאה: {outcome_heb}"""
# Which doc_types are relevant per block.
# None → skip source docs entirely (block uses other context, e.g. claims_context)
# [] → include all doc types (default for unspecified blocks)
# [..] → include only the listed doc_type values
_BLOCK_DOC_TYPES: dict[str, list[str] | None] = {
"block-he": None, # only case_context needed; no full docs
"block-vav": ["appeal", "protocol"], # כתב ערר + פרוטוקול ועדה
"block-zayin": None, # claims_context is sufficient
"block-chet": ["protocol"], # פרוטוקול + השלמות טיעון
"block-tet": ["appraisal"], # שומות בלבד
# block-yod, block-yod-alef, block-he etc. default → all docs
}
async def _build_source_context(case_id: UUID, block_id: str) -> str:
"""Get document texts for the block, filtered by relevance.
Per Anthropic best practices: send full source documents, not truncated excerpts.
Per-block filtering prevents context overflow on large cases (9+ docs).
"""
allowed = _BLOCK_DOC_TYPES.get(block_id, []) # [] sentinel = not in map → all docs
if allowed is None:
return "" # this block doesn't need raw source docs
docs = await db.list_documents(case_id)
context_parts = []
for doc in docs:
if allowed and doc["doc_type"] not in allowed:
continue
text = await db.get_document_text(UUID(doc["id"]))
if text:
context_parts.append(f"--- מסמך: {doc['title']} ({doc['doc_type']}) ---\n{text}")
return "\n\n".join(context_parts) if context_parts else "(אין מסמכים)"
async def _build_claims_context(case_id: UUID) -> str:
claims = await db.get_claims(case_id)
if not claims:
return "(לא חולצו טענות)"
# Filter out claims from block-zayin (decision summary) — use only
# claims extracted from original pleadings (appeal, response, etc.)
source_claims = [c for c in claims if c.get("source_document", "") != "block-zayin"]
if not source_claims:
# Fallback to all claims if no source claims exist
source_claims = claims
lines = []
current_role = ""
role_heb = {"appellant": "טענות העוררים", "respondent": "טענות המשיבים",
"committee": "עמדת הוועדה המקומית", "permit_applicant": "עמדת מבקשי ההיתר"}
claim_num = 0
for c in source_claims:
if c["party_role"] != current_role:
current_role = c["party_role"]
lines.append(f"\n### {role_heb.get(current_role, current_role)}")
claim_num += 1
lines.append(f"טענה #{claim_num}: {c['claim_text'][:400]}")
lines.append(f"\n**סה\"כ {claim_num} טענות. ענה על כל טענה מהותית; טענות [bundle] — אגד; טענות [skip] — ציון קצר בלבד.**")
return "\n".join(lines)
def _build_direction_context(decision: dict | None) -> str:
if not decision:
return "(לא הוגדר כיוון)"
dir_doc = decision.get("direction_doc") or {}
if not dir_doc.get("approved"):
return "(כיוון לא אושר)"
parts = []
outcome_heb = dir_doc.get("outcome_hebrew", "")
if outcome_heb:
parts.append(f"תוצאה: {outcome_heb}")
reasoning = dir_doc.get("reasoning", "")
if reasoning:
parts.append(f"נימוק: {reasoning}")
direction = dir_doc.get("selected_direction")
if direction:
parts.append(f"כיוון נבחר: {direction.get('name', '')}")
for r in direction.get("reasoning", []):
parts.append(f" - {r}")
for p in direction.get("precedents", []):
parts.append(f" פסיקה: {p}")
notes = dir_doc.get("additional_notes", "")
if notes:
parts.append(f"הערות: {notes}")
return "\n".join(parts) if parts else "(אין מסמך כיוון)"
async def _build_plans_context(case_id: UUID) -> str:
"""Get plan references from document metadata."""
docs = await db.list_documents(case_id)
plans = set()
for doc in docs:
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
refs = metadata.get("references", {})
for p in refs.get("plans", []):
plans.add(p.get("plan_name", ""))
if plans:
return "\n".join(f"- {p}" for p in sorted(plans) if p)
return "(לא זוהו תכניות)"
APPRAISER_SIDE_LABEL_HE = {
"committee": "שמאי הוועדה המקומית",
"appellant": "שמאי העורר",
"deciding": "שמאי מכריע",
"": "שמאי (לא תויג)",
}
# Sort key: committee → appellant → deciding → untagged. This matches the order
# used by db.detect_appraiser_conflicts so the deciding appraiser is last —
# i.e. the conclusion reads most naturally ("...and the deciding appraiser ruled...").
_SIDE_ORDER = {"committee": 1, "appellant": 2, "deciding": 3, "": 4}
def _side_label(side: str) -> str:
return APPRAISER_SIDE_LABEL_HE.get(side or "", APPRAISER_SIDE_LABEL_HE[""])
async def _build_appraiser_facts_context(case_id: UUID) -> str:
"""Group appraiser_facts by side (then name), list each appraiser's plans+permits."""
facts = await db.list_appraiser_facts(case_id)
if not facts:
return "(לא חולצו עובדות שמאיות. הרץ extract_appraiser_facts.)"
# (side, name) → {plan: [...], permit: [...]}
groups: dict[tuple[str, str], dict[str, list[dict]]] = {}
for f in facts:
key = (f.get("appraiser_side", "") or "", f["appraiser_name"])
bucket = groups.setdefault(key, {"plan": [], "permit": []})
bucket[f["fact_type"]].append(f)
ordered_keys = sorted(groups.keys(), key=lambda k: (_SIDE_ORDER.get(k[0], 9), k[1]))
lines: list[str] = []
for side, name in ordered_keys:
lines.append(f"\n### {_side_label(side)}{name}")
for label, key in (("תכניות", "plan"), ("היתרים", "permit")):
items = groups[(side, name)][key]
if not items:
continue
lines.append(f"**{label}:**")
for item in items:
details = item.get("details") or {}
ident = item["identifier"]
scope = (details.get("scope") or "").strip()
date_s = (details.get("date") or "").strip()
status = (details.get("status") or "").strip()
quote = (details.get("raw_quote") or "").strip()
bits = [ident]
if date_s:
bits.append(f"תאריך: {date_s}")
if status:
bits.append(f"סטטוס: {status}")
if scope:
bits.append(f"היקף: {scope}")
line = " | ".join(bits)
if quote:
line += f"\n ציטוט: \"{quote[:200]}\""
lines.append(f"- {line}")
return "\n".join(lines)
async def _build_appraiser_conflicts_context(case_id: UUID) -> str:
"""Render conflict groups so the prompt can quote them in the body.
Entries arrive pre-ordered from the DB by side (committee→appellant→deciding).
When a deciding appraiser exists, the prompt must treat their view as the
governing factual determination.
"""
conflicts = await db.detect_appraiser_conflicts(case_id)
if not conflicts:
return "(אין סתירות בין שמאים)"
type_label = {"plan": "תכנית", "permit": "היתר"}
lines: list[str] = []
for c in conflicts:
has_deciding = any(e.get("appraiser_side") == "deciding" for e in c["entries"])
header = f"\n### סתירה — {type_label.get(c['fact_type'], c['fact_type'])}: {c['identifier']}"
if has_deciding:
header += " _(יש שמאי מכריע — עמדתו קובעת)_"
lines.append(header)
for entry in c["entries"]:
side = entry.get("appraiser_side", "") or ""
details = entry.get("details") or {}
scope = (details.get("scope") or "").strip()
status = (details.get("status") or "").strip()
quote = (details.get("raw_quote") or "").strip()
marker = "" if side == "deciding" else ""
parts = [f"**{marker}{_side_label(side)}{entry['appraiser_name']}**"]
if status:
parts.append(f"סטטוס: {status}")
if scope:
parts.append(f"היקף: {scope}")
line = " | ".join(parts)
if quote:
line += f"\n ציטוט: \"{quote[:200]}\""
lines.append(f"- {line}")
return "\n".join(lines)
async def _build_post_hearing_context(case_id: UUID) -> str:
"""List documents flagged as submitted after the hearing.
Convention: documents.metadata.is_post_hearing == True.
"""
docs = await db.list_documents(case_id)
items: list[dict] = []
for d in docs:
meta = d.get("metadata") or {}
if isinstance(meta, str):
meta = json.loads(meta)
if not meta.get("is_post_hearing"):
continue
items.append({
"title": d.get("title", ""),
"doc_type": d.get("doc_type", ""),
"submitted_on": meta.get("submitted_on", ""),
"kind": meta.get("post_hearing_kind", ""), # "supplementary_brief" | "settlement_proposal" | ...
})
if not items:
return "(לא הוגשו מסמכים לאחר הדיון, או שהם לא סומנו כ-post_hearing)"
lines: list[str] = []
for it in items:
meta_bits = []
if it["submitted_on"]:
meta_bits.append(f"הוגש: {it['submitted_on']}")
if it["kind"]:
meta_bits.append(f"סוג: {it['kind']}")
if it["doc_type"]:
meta_bits.append(f"doc_type={it['doc_type']}")
meta_str = f" ({', '.join(meta_bits)})" if meta_bits else ""
lines.append(f"- {it['title']}{meta_str}")
return "\n".join(lines)
async def _build_precedents_context(case_id: UUID, block_id: str) -> tuple[str, list[str]]:
"""Search for similar precedent paragraphs from other decisions and case law."""
parts = []
case_law_ids: list[str] = []
try:
case = await db.get_case(case_id)
case_number = case.get("case_number", "") if case else ""
subject = case.get("subject", "") if case else ""
query = f"דיון משפטי בנושא {subject}" if subject else "דיון משפטי ועדת ערר"
query_emb = await embeddings.embed_query(query)
# Search 1: paragraph_embeddings (from other decisions by Dafna)
para_results = await db.search_similar_paragraphs(
query_embedding=query_emb, limit=10, block_type="block-yod",
)
# Filter out same case
para_results = [r for r in para_results if r.get("case_number", "") != case_number]
for r in para_results[:4]:
parts.append(
f"[החלטת {r.get('case_number', '?')}{r.get('case_title', '')}, "
f"בלוק {r.get('block_type', '')}]\n{r['content'][:500]}"
)
# Search 2: case_law_embeddings (precedent case law)
pool = await db.get_pool()
async with pool.acquire() as conn:
caselaw_rows = await conn.fetch(
"""SELECT cl.id, cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,
1 - (cle.embedding <=> $1) AS score
FROM case_law_embeddings cle
JOIN case_law cl ON cl.id = cle.case_law_id
ORDER BY cle.embedding <=> $1
LIMIT 5""",
query_emb,
)
for r in caselaw_rows[:3]:
case_law_ids.append(str(r["id"]))
text = r["key_quote"] or r["summary"] or ""
if text:
parts.append(
f"[פסיקה: {r['case_number']} {r['case_name']} ({r.get('court', '')})] "
f"score={r['score']:.3f}\n{text[:400]}"
)
except Exception as e:
logger.warning("Failed to fetch precedents: %s", e)
return ("\n\n".join(parts) if parts else "(אין תקדימים)"), case_law_ids
async def _build_style_context() -> str:
"""Build comprehensive style guide from DB patterns + SKILL.md rules.
Per Anthropic: explicit style instructions reduce generic output.
"""
lines = []
# Core style rules (from SKILL.md analysis)
lines.append("""## כללי סגנון דפנה תמיר — חובה:
### טון:
- ערר רישוי (1xxx): חם יחסית, עם אלמנטים אנושיים
- ערר השבחה (8xxx): קר, יבש, מקצועי
- גוף ראשון רבים: "אנו סבורים", "מצאנו כי", "לדעתנו"
- ישיר ובהיר — לא אקדמי ולא מסורבל
### ביטויים ייחודיים (חובה להשתמש):
- "לפנינו..." (פתיחה)
- "כידוע..." (הצגת עקרון ידוע)
- "ברי כי..." / "ודוק..." (הדגשה)
- "אין בידנו לקבל" (דחיית טענה)
- "בטענה זו מצאנו טעם" (קבלת טענה)
- "יחד עם זאת" (מעבר לאיזון)
- "למעלה מן הצורך" / "נבקש שלא לצאת בחסר" (הרחבה)
- "הדברים מתחדדים שעה ש..." (חידוד)
- "מחד... מאידך... על כן..." (איזון לפני הכרעה)
- "לאור כל האמור לעיל" (סיכום)
- "ניתנה פה אחד היום" (סיום)
### מבנה דיון:
- אסה רציפה ללא כותרות משנה (חריג: נושאים נפרדים לחלוטין)
- מסקנה בפתיחה, לא בסוף
- מעברים טקסטואליים, לא כותרות
- ניטרול טענות חלשות לפני ניתוח מעמיק
- ציטוטי פסיקה כבלוקים מוגדלים
### טענות צדדים:
- עוררים: "העוררים טוענים כי...", "לטענתם...", "עוד ציינו כי..."
- ועדה: "הוועדה המקומית הציגה/הבהירה/הוסיפה כי..."
- מבקשי היתר: "מבקשי ההיתר דוחים מכל וכל...", "לטענתם...", "מבקשי ההיתר מציינים כי..."
""")
# DB patterns (actual examples from Dafna's decisions)
patterns = await db.get_style_patterns()
if patterns:
lines.append("### דפוסים שחולצו מהחלטות קודמות:")
grouped: dict[str, list] = {}
for p in patterns:
grouped.setdefault(p["pattern_type"], []).append(p)
type_names = {
"opening_formula": "פתיחה",
"transition": "מעברים",
"characteristic_phrase": "ביטויים אופייניים",
"closing_formula": "סיום",
"citation_style": "ציטוט",
}
for ptype in ["characteristic_phrase", "transition", "opening_formula", "closing_formula"]:
items = grouped.get(ptype, [])
if items:
lines.append(f"\n**{type_names.get(ptype, ptype)}:**")
for item in items[:8]:
lines.append(f"- {item['pattern_text']}")
return "\n".join(lines)
async def _build_previous_blocks_context(case_id: UUID, decision: dict | None) -> str:
"""Get content of previously written blocks."""
if not decision:
return "(אין בלוקים קודמים)"
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, title, content, word_count
FROM decision_blocks
WHERE decision_id = $1 AND word_count > 0
ORDER BY block_index""",
UUID(decision["id"]),
)
if not rows:
return "(אין בלוקים קודמים)"
parts = []
for r in rows:
content = r["content"][:2000]
parts.append(f"### {r['title']} ({r['block_id']})\n{content}")
return "\n\n".join(parts)
# ── Context-only mode (for Claude Code to write) ─────────────────
async def get_block_context(case_id: UUID, block_id: str, instructions: str = "") -> dict:
"""Return full context package for a block WITHOUT calling Claude API.
Claude Code (or any external writer) uses this context to write the block,
then saves it via save_block_content.
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
# Template blocks — return content directly
if block_id in TEMPLATE_WRITERS:
content = TEMPLATE_WRITERS[block_id](case, decision)
return {
"block_id": block_id,
"title": block_cfg["title"],
"mode": "template",
"content": content,
}
# Build all context components
prompt_template = BLOCK_PROMPTS.get(block_id, "")
case_context = _build_case_context(case, decision)
source_context = await _build_source_context(case_id, block_id)
claims_context = await _build_claims_context(case_id)
direction_context = _build_direction_context(decision)
plans_context = await _build_plans_context(case_id)
precedents_context, _ = await _build_precedents_context(case_id, block_id)
style_context = await _build_style_context()
discussion_context = await _build_previous_blocks_context(case_id, decision)
appraiser_facts_context = await _build_appraiser_facts_context(case_id)
appraiser_conflicts_context = await _build_appraiser_conflicts_context(case_id)
post_hearing_context = await _build_post_hearing_context(case_id)
outcome = (decision or {}).get("outcome", "rejected")
structure_guidance = STRUCTURE_GUIDANCE.get(outcome, "")
# Content checklist + methodology for block-yod
content_checklist = ""
methodology_guidance = ""
if block_id == "block-yod":
content_checklist = get_content_checklist(
appeal_type=case.get("appeal_type", ""),
subject=case.get("subject", ""),
subject_categories=case.get("subject_categories", []),
)
methodology_guidance = get_methodology_summary()
formatted_prompt = prompt_template.format(
case_context=case_context,
source_context=source_context,
claims_context=claims_context,
direction_context=direction_context,
plans_context=plans_context,
precedents_context=precedents_context,
style_context=style_context,
discussion_context=discussion_context,
structure_guidance=structure_guidance,
content_checklist=content_checklist,
methodology_guidance=methodology_guidance,
appraiser_facts_context=appraiser_facts_context,
appraiser_conflicts_context=appraiser_conflicts_context,
post_hearing_context=post_hearing_context,
)
if instructions:
formatted_prompt += f"\n\n## הנחיות נוספות:\n{instructions}"
# Block י requires approved direction
if block_id == "block-yod":
dir_doc = (decision or {}).get("direction_doc") or {}
if not dir_doc.get("approved"):
raise ValueError("לא ניתן לכתוב בלוק דיון ללא כיוון מאושר.")
return {
"block_id": block_id,
"title": block_cfg["title"],
"mode": "context",
"prompt": formatted_prompt,
"source_documents": source_context,
"claims": claims_context,
"direction": direction_context,
"precedents": precedents_context,
"style_guide": style_context,
"previous_blocks": discussion_context,
}
async def save_block_content(case_id: UUID, block_id: str, content: str) -> dict:
"""Save block content written by Claude Code (or any external writer).
Saves to DB and also writes/updates the draft file on disk.
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
decision = await db.get_decision_by_case(case_id)
if not decision:
decision = await db.create_decision(case_id=case_id)
result = _build_result(block_id, content, block_cfg)
result["generation_type"] = "claude-code"
result["model_used"] = "claude-code"
await store_block(UUID(decision["id"]), result)
await db.mark_blocks_stale(case_id, False)
# Also write/update the draft file on disk
await _update_draft_file(case_id, UUID(decision["id"]))
return result
async def _update_draft_file(case_id: UUID, decision_id: UUID) -> None:
"""Rebuild drafts/decision.md from all blocks in DB."""
from pathlib import Path
case = await db.get_case(case_id)
if not case:
return
case_dir = config.find_case_dir(case["case_number"])
draft_dir = case_dir / "drafts"
draft_dir.mkdir(parents=True, exist_ok=True)
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT content FROM decision_blocks WHERE decision_id = $1 AND content != '' ORDER BY block_index",
decision_id,
)
draft_path = draft_dir / "decision.md"
draft_path.write_text("\n\n".join(row["content"] for row in rows if row["content"]), encoding="utf-8")
logger.info("Draft file updated: %s (%d blocks)", draft_path, len(rows))
# ── Renumbering ───────────────────────────────────────────────────
async def renumber_all_blocks(decision_id: UUID) -> dict:
"""מספור רציף מחדש של כל הבלוקים בהחלטה.
עובר על כל הבלוקים לפי סדר, ומחליף את כל המספורים
(1. 2. 3. או **1.** **2.**) לרצף אחד רציף.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, block_index, content, word_count
FROM decision_blocks WHERE decision_id = $1
ORDER BY block_index""",
decision_id,
)
current_num = 1
updated = 0
# Blocks that shouldn't be numbered
skip_blocks = {"block-alef", "block-bet", "block-gimel", "block-dalet", "block-yod-bet"}
for row in rows:
if row["block_id"] in skip_blocks or not row["content"]:
continue
content = row["content"]
# Replace numbered paragraphs: "N." or "**N.**" or "**N.**" at line start
def replace_num(match):
nonlocal current_num
prefix = match.group(1) or "" # bold markers
suffix = match.group(3) or "" # bold markers
result = f"{prefix}{current_num}{suffix}"
current_num += 1
return result
new_content = re.sub(
r'^(\*\*)?(\d+)(\.?\*?\*?\.)',
replace_num,
content,
flags=re.MULTILINE,
)
if new_content != content:
async with pool.acquire() as conn:
await conn.execute(
"UPDATE decision_blocks SET content = $1, updated_at = now() WHERE decision_id = $2 AND block_id = $3",
new_content, decision_id, row["block_id"],
)
updated += 1
return {"total_paragraphs": current_num - 1, "blocks_updated": updated}
# ── Store block ───────────────────────────────────────────────────
async def store_block(decision_id: UUID, block_result: dict) -> None:
"""שמירת בלוק ב-DB (upsert)."""
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO decision_blocks
(decision_id, block_id, block_index, title, content, word_count,
generation_type, model_used, temperature, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'draft')
ON CONFLICT (decision_id, block_id) DO UPDATE SET
content = EXCLUDED.content,
word_count = EXCLUDED.word_count,
generation_type = EXCLUDED.generation_type,
model_used = EXCLUDED.model_used,
temperature = EXCLUDED.temperature,
status = 'draft',
updated_at = now()""",
decision_id,
block_result["block_id"],
block_result["block_index"],
block_result["title"],
block_result["content"],
block_result["word_count"],
block_result["generation_type"],
block_result["model_used"],
block_result["temperature"],
)
async def write_and_store_block(
case_id: UUID,
block_id: str,
instructions: str = "",
) -> dict:
"""כתיבת בלוק ושמירה ב-DB."""
decision = await db.get_decision_by_case(case_id)
if not decision:
# Create decision if not exists
decision = await db.create_decision(case_id=case_id)
result = await write_block(case_id, block_id, instructions)
await store_block(UUID(decision["id"]), result)
await audit.log_action_safe(
"write_block", case_id=case_id,
details={
"decision_id": str(decision["id"]),
"block_id": block_id,
"model_used": result.get("model_used"),
"generation_type": result.get("generation_type"),
"sources": result.get("sources", {}),
},
)
await db.mark_blocks_stale(case_id, False)
return result