Files
legal-ai/mcp-server/src/legal_mcp/services/block_writer.py
Chaim 0d8cc31a2b
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
feat(storage): seal INV-STG1 write path — 15 dual-write seals + CI leak-guard + tripwire
אחרי ה-cutover ל-s3-only, אודיט מצא 15 אתרי-כתיבת-בלוב שעוקפים את storage.py (uploads/
finalize/exports/training/research-backup/precedents/bulletins/draft) — קובץ ינחת
בתיקיות-הישנות אך **לא** ב-MinIO → יאבד בניקוי, לא מוגש, לא מגובה. ה-pipeline (ingest/
extract) עדיין קורא לפי file_path מהדיסק, אז ביטול-מוחלט של כתיבה-לדיסק דורש read-wiring
מלא (Phase 2, משימה נפרדת). תיקון בטוח עכשיו = **dual-write seal**.

- storage.py: `mirror`/`mirror_file` (+ sync) — best-effort persist ל-S3 כשה-backend
  s3/dual (no-op ב-filesystem; כשל S3 נרשם, לא שובר request — DualBackend philosophy).
- web/app.py: helpers `_seal_blob`/`_seal_blob_file` + 14 אתרים אטומים (storage.mirror
  אחרי כתיבת-הדיסק; הדיסק נשאר ל-pipeline). block_writer.py: draft אטום (async).
- **CI leak-guard** (test_storage_write_leak_guard): נכשל על כל כתיבת-בלוב-לדיסק
  (write_bytes/write_text/shutil.copy*/open(wb)) ב-web/+services ללא מרקר `# noqa: STG1`.
  כל ה-benign (fallbacks/tmp/staging/git-metadata/flag/state) מסומנים עם נימוק. storage.py
  מוחרג (הוא המימוש).
- **tripwire** (scripts/storage_leak_tripwire.py): ניטור-ריצה — בלובים בדיסק שלא ב-MinIO
  (json-key match, bucket per-file). אומת חי: 0 דליפות.

invariants: INV-STG1 (כל I/O דרך storage / ממורר אליו) · INV-STG6 · feedback_silent_swallow
(mirror רושם warning, לא bare-except). Phase 2 (read-wire ה-pipeline → להפיל את עותק-הדיסק)
= follow-up. tests: 4 mirror + 1 leak-guard + 6 serve_blob + 18 storage קיימות עוברות.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 19:57:12 +00:00

1252 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""מנוע כתיבת בלוקים להחלטת ועדת ערר.
מייצר טקסט בפועל לכל בלוק (ה-יב) בהתבסס על:
- block-schema.md (פרמטרים, constraints, מבנה)
- SKILL.md (סגנון דפנה)
- חומרי המקור (מסמכים, טענות, פסיקה)
- מסמך כיוון (חובה לבלוק י)
בלוקים א-ד ויב = template-fill (ללא AI).
בלוקים ה-יא = AI generation עם Claude.
"""
from __future__ import annotations
import json
import logging
import re
from datetime import date
from uuid import UUID
from pathlib import Path
from legal_mcp import config
from legal_mcp.services import db, embeddings, claude_session, audit, storage
from legal_mcp.services.lessons import (
OUTCOME_LABELS_HE,
PRACTICE_AREA_OVERRIDES,
canonical_outcome,
get_content_checklist,
get_methodology_summary,
)
logger = logging.getLogger(__name__)
# ── Block configuration ───────────────────────────────────────────
# Output token limits per Anthropic docs:
# Opus 4.7: up to 128K output tokens (new tokenizer — ~35% more tokens)
# Sonnet 4.6: up to 64K output tokens
# Streaming required when max_tokens > 21,333
BLOCK_CONFIG = {
"block-alef": {"index": 1, "title": "כותרת מוסדית", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-bet": {"index": 2, "title": "הרכב הוועדה", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-gimel":{"index": 3, "title": "צדדים", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-dalet":{"index": 4, "title": "החלטה", "gen_type": "template-fill", "temp": 0, "model": "script"},
"block-he": {"index": 5, "title": "פתיחה", "gen_type": "paraphrase", "temp": 0.2, "model": "sonnet", "max_tokens": 4096},
"block-vav": {"index": 6, "title": "רקע עובדתי", "gen_type": "reproduction", "temp": 0, "model": "sonnet", "max_tokens": 16384},
"block-zayin":{"index": 7, "title": "טענות הצדדים", "gen_type": "paraphrase", "temp": 0.1, "model": "sonnet", "max_tokens": 16384},
"block-chet": {"index": 8, "title": "הליכים", "gen_type": "reproduction", "temp": 0, "model": "sonnet", "max_tokens": 8192},
"block-tet": {"index": 9, "title": "תכניות חלות", "gen_type": "guided-synthesis", "temp": 0.2, "model": "opus", "max_tokens": 16384},
"block-yod": {"index": 10, "title": "דיון והכרעה", "gen_type": "rhetorical-construction", "temp": 0.4, "model": "opus", "max_tokens": 16384},
"block-yod-alef": {"index": 11, "title": "סיכום", "gen_type": "paraphrase", "temp": 0.1, "model": "sonnet", "max_tokens": 8192},
"block-yod-bet": {"index": 12, "title": "חתימות", "gen_type": "template-fill", "temp": 0, "model": "script"},
}
MODEL_MAP = {
"sonnet": "claude-sonnet-4-20250514",
"opus": "claude-opus-4-7",
}
# ── Template blocks (א-ד, יב) ────────────────────────────────────
def write_block_alef(case: dict, decision: dict | None = None) -> str:
"""כותרת מוסדית."""
return f"""מדינת ישראל
ועדת ערר לתכנון ולבנייה — מחוז ירושלים
ערר מס' {case['case_number']}"""
def write_block_bet(case: dict, decision: dict | None = None) -> str:
"""הרכב הוועדה."""
panel = (decision or {}).get("panel_members", [])
members_text = ""
if panel:
for m in panel:
members_text += f"\n{m}"
return f"""בפני:
עו"ד דפנה תמיר, יו"ר{members_text}"""
def write_block_gimel(case: dict, decision: dict | None = None) -> str:
"""צדדים."""
appellants = "\n".join(case.get("appellants", ["(לא צוין)"]))
respondents = "\n".join(case.get("respondents", ["(לא צוין)"]))
return f"""{appellants}
נגד
{respondents}"""
def write_block_dalet(case: dict, decision: dict | None = None) -> str:
"""כותרת החלטה."""
return "החלטה"
def write_block_yod_bet(case: dict, decision: dict | None = None) -> str:
"""חתימות."""
today = date.today().strftime("%d.%m.%Y")
return f"""ניתנה היום, {today}, פה אחד.
דפנה תמיר, עו"ד
יו"ר ועדת הערר"""
TEMPLATE_WRITERS = {
"block-alef": write_block_alef,
"block-bet": write_block_bet,
"block-gimel": write_block_gimel,
"block-dalet": write_block_dalet,
"block-yod-bet": write_block_yod_bet,
}
# ── AI-generated blocks (ה-יא) ───────────────────────────────────
BLOCK_PROMPTS = {
"block-he": """כתוב את בלוק הפתיחה (בלוק ה) של החלטת ועדת ערר.
## כללים:
- פתח ב"לפנינו ערר..." או "עניינה של החלטה זו..."
- הגדר "להלן" מרכזיים: הוועדה המקומית, התכנית/הבקשה, המגרש
- 1-2 סעיפים בלבד
- אין ניתוח, אין ערכי שיפוט, אין ציטוטים מצדדים
- מספור: 1.
## פרטי התיק:
{case_context}
## חומרי מקור:
{source_context}""",
"block-vav": """כתוב את בלוק הרקע העובדתי (בלוק ו, "פתח דבר") של החלטת ועדת ערר.
## כללים קריטיים:
- **רקע ניטרלי** — עובדות בלבד. אין ציטוטים ישירים מצדדים. אין מילות ערך/שיפוט ("חריג", "חטא", "בעייתי").
- סדר פנימי: מקרקעין → סביבה → היסטוריה תכנונית → מהות הבקשה → החלטת הוועדה → הגשת הערר
- סמן מיקומי תמונות: [📷 מיקום GIS], [📷 תשריט]
- ציטוט מפרוטוקול ועדה מקומית (אם יש) כ-blockquote
- מספור רציף מהבלוק הקודם
## פרטי התיק:
{case_context}
## חומרי מקור:
{source_context}""",
"block-zayin": """כתוב את בלוק טענות הצדדים (בלוק ז, "תמצית טענות הצדדים") של החלטת ועדת ערר.
## כללים קריטיים:
- **סנתז טענות דומות** — אל תרשום כל טענה בנפרד. קבץ טענות דומות לנושא אחד. למשל: כל הטענות על הודעות → סעיף אחד, כל הטענות על רכוש משותף → סעיף אחד.
- גוף שלישי: "העוררים טוענים כי...", "הוועדה המקומית ציינה כי..."
- **מבנה קבוע עם 3 חלקים:**
1. "טענות העוררים" — 8-12 סעיפים מקובצים לפי נושא
2. "עמדת הוועדה המקומית" — 5-8 סעיפים
3. "עמדת מבקשי ההיתר" (אם יש) — 5-10 סעיפים
- כותרת: "תמצית טענות הצדדים"
- נאמנות למקור — לא להמציא טענות, אבל כן לאחד ולסכם טענות חוזרות
- אין ניתוח, אין מסקנות, אין הערכה ("טענה חלשה/חזקה")
- רק מכתבי טענות מקוריים (לא השלמות טיעון)
- מספור רציף
- **יעד אורך: 800-1500 מילים**
## טענות שחולצו (קבץ טענות דומות לנושאים):
{claims_context}
## פרטי התיק:
{case_context}""",
"block-chet": """כתוב את בלוק ההליכים (בלוק ח, "ההליכים בפני ועדת הערר") של החלטת ועדת ערר.
## כללים:
- תיעוד כרונולוגי: דיון → סיור → השלמות טיעון → משא-ומתן לפשרה (אם היה) → החלטות ביניים
- תאריכים מדויקים
- אם בדיון עלו נקודות חדשות או הובהרו סוגיות משפטיות — ציין זאת במפורש בסעיף נפרד
- תוכן כל השלמת טיעון/הצעת פשרה בסעיף נפרד עם תאריך
- סמן תמונות מסיור: [📷 צילום מסיור]
- אין ניתוח או הערכה
- מספור רציף
## פרטי התיק:
{case_context}
## מסמכים שהוגשו לאחר הדיון (אם יש):
{post_hearing_context}
## חומרי מקור:
{source_context}""",
"block-tet": """כתוב את בלוק התכניות החלות (בלוק ט) של החלטת ועדת ערר, **כולל תת-פרק היתרים**.
## מבנה נדרש:
1. **תכניות חלות** — מבנה הירכי: תכניות ארציות → מחוזיות → מקומיות. ציטוט ישיר מהוראות תכנית עם **הדגשה** של מילים מכריעות.
2. **תת-פרק היתרים** — כותרת משנה "היתרים" (או "היתרי בנייה שניתנו במקרקעין"). פירוט ההיתרים הרלוונטיים על פי השומות שהוגשו לתיק.
## כללי ציון סתירות בין שמאים (קריטי):
- אם שני שמאים או יותר מסרו מידע שונה על אותה תכנית או היתר — חובה לסמן זאת במפורש בנוסח ניטרלי, למשל:
> "יצוין כי שמאי הוועדה ציין כי תכנית פלונית חלה על המקרקעין במלואה, בעוד שמאי העורר סבר כי חלקה של התכנית בלבד חל"
- **כשקיים שמאי מכריע** — השומה שלו היא הקובעת עובדתית. סמן זאת במפורש בסוף הדיון בסתירה, בנוסח: "ואולם, השמאי המכריע קבע כי..." או "השמאי המכריע, שבחן את עמדות הצדדים, הכריע כי...". הצג את עמדת המכריע **אחרונה** כדי שההקשר יבנה אליה.
- השתמש בתוויות הצד המדויקות: "שמאי הוועדה המקומית", "שמאי העורר", "שמאי מכריע" — ולא בשמות פרטיים אלא אם נדרש לבהירות.
- אין להכריע בסתירה משפטית או להגיע למסקנה נורמטיבית בבלוק זה — ההכרעה המשפטית (אם נדרשת) תבוא בבלוק י. כאן מציגים רק את הממצא העובדתי כפי שהוא, כולל הכרעת המכריע העובדתית.
- אם אין סתירה — אין להזכיר זאת.
## כללים נוספים:
- אין ניתוח מעמיק (→ בלוק י), אין הכרעה בין פרשנויות
- מספור רציף
- אם אין שומות בתיק — דווח רק על תכניות שזוהו ממסמכים אחרים, וציין במשפט אחד שלא הוגשו שומות
## פרטי התיק:
{case_context}
## תכניות שזוהו (ממטא-דאטה של מסמכים):
{plans_context}
## עובדות שמאיות שחולצו (תכניות + היתרים, פרק לכל שמאי):
{appraiser_facts_context}
## סתירות שזוהו בין שמאים (חובה לסמן בנוסח):
{appraiser_conflicts_context}
## חומרי מקור:
{source_context}""",
"block-yod": """כתוב את בלוק הדיון וההכרעה (בלוק י) של החלטת ועדת ערר.
## זהו הבלוק הקריטי ביותר — ליבת ההחלטה (ratio decidendi).
## אורך נדרש: **2,000-4,000 מילים לפחות**. זהו הבלוק הארוך ביותר בהחלטה (35-50%).
{methodology_guidance}
{content_checklist}
## כללים נוספים:
- **ללא כפילות** — הפנה לבלוקים קודמים: "כאמור בסעיף X לעיל"
- **מספור רציף** — המשך מספור מהבלוק הקודם
- מותרות כותרות-משנה כשיש נושאים נפרדים לחלוטין
## כיוון מאושר (חובה):
{direction_context}
## מבנה לפי תוצאה:
{structure_guidance}
## טענות:
{claims_context}
## חומרי מקור:
{source_context}
## דוגמאות-סגנון מהחלטות דפנה — מבנה וקול בלבד:
⚠️ אלה דוגמאות ל**איך** דפנה כותבת (מבנה, קצב, תנועות-הנמקה, ביטויים) — **לא מקור-תוכן**. הכלל המבחין: נוסחה/בוילרפלייט קבוע (פתיח דוקטרינלי, תבנית-סיום) → מותר להעתיק; ניתוח/טענות ספציפיים → **הכלל את הדפוס והתאם לתיק שלפניך**, אל תעתיק; מהות משפטית (הלכה/עובדה) מתיק אחר → **אסור** להעתיק.
{daphna_style_exemplars}
## פסיקה רלוונטית לציטוט (צטט מכאן ומהידע הכללי שלך):
{case_law_citations}
## סגנון דפנה:
{style_context}""",
"block-yod-alef": """כתוב את בלוק הסיכום (בלוק יא, "סוף דבר") של החלטת ועדת ערר.
## כללים:
- כותרת: "סוף דבר" או "סיכום"
- תוצאה ברורה: "הערר נדחה" / "הערר מתקבל" / "הערר מתקבל באופן חלקי"
- הוראות אופרטיביות חד-משמעיות
- אין חזרה על נימוקים — ההנמקה כבר בדיון
- מספור רציף
## מבנה לפי תוצאה:
- דחייה: "הערר נדחה" + תתי-סעיפים + פסקה חמה (רישוי בלבד)
- קבלה: "הערר מתקבל בכפוף ל..." + פרוזה
- קבלה חלקית: "הערר מתקבל באופן חלקי" + 2-3 הוראות אופרטיביות
## כיוון ותוצאה:
{direction_context}
## בלוקים קודמים (דיון):
{discussion_context}""",
}
# Discussion structure by outcome
# GAP-51: keyed by canonical outcomes (rejection/partial_acceptance/full_acceptance).
STRUCTURE_GUIDANCE = {
"rejection": "דחייה — שכבות הגנה (concentric circles): טענה ראשית → נדחית, טענה חלופית → נדחית, חיזוק.",
"full_acceptance": "קבלה — נימוק-נימוק: כל נימוק = CREAC מלא, בניית שכנוע הדרגתי.",
"partial_acceptance": "קבלה חלקית — מיפוי מתחים: מה מתקבל ולמה, מה נדחה ולמה, איזון.",
}
async def write_block(
case_id: UUID,
block_id: str,
instructions: str = "",
) -> dict:
"""כתיבת בלוק יחיד בהחלטה.
Args:
case_id: מזהה התיק
block_id: מזהה הבלוק (block-alef, block-he, block-yod, ...)
instructions: הנחיות נוספות
Returns:
dict עם content, word_count, block_id, generation_type
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
# Template blocks
if block_id in TEMPLATE_WRITERS:
content = TEMPLATE_WRITERS[block_id](case, decision)
r = _build_result(block_id, content, block_cfg)
r["sources"] = {"document_ids": [], "claim_ids": [], "case_law_ids": []}
return r
# AI-generated blocks
prompt_template = BLOCK_PROMPTS.get(block_id)
if not prompt_template:
raise ValueError(f"No prompt template for {block_id}")
# Build context components
case_context = _build_case_context(case, decision)
source_context = await _build_source_context(case_id, block_id)
claims_context = await _build_claims_context(case_id)
direction_context = _build_direction_context(decision)
plans_context = await _build_plans_context(case_id)
daphna_style_exemplars, case_law_citations, _precedent_case_law_ids = (
await _build_precedents_context(case_id, block_id)
)
style_context = await _build_style_context(case.get("practice_area", ""))
discussion_context = await _build_previous_blocks_context(case_id, decision)
appraiser_facts_context = await _build_appraiser_facts_context(case_id)
appraiser_conflicts_context = await _build_appraiser_conflicts_context(case_id)
post_hearing_context = await _build_post_hearing_context(case_id)
outcome = canonical_outcome((decision or {}).get("outcome", "rejection"))
structure_guidance = STRUCTURE_GUIDANCE.get(outcome, "")
if case.get("practice_area") == "betterment_levy":
structure_guidance = (
structure_guidance + " | היטל השבחה: "
+ " ".join(PRACTICE_AREA_OVERRIDES["betterment_levy"]["discussion_rules"])
).strip()
# Content checklist — tells block-yod WHAT topics to cover
content_checklist = ""
methodology_guidance = ""
if block_id == "block-yod":
content_checklist = get_content_checklist(
appeal_type=case.get("appeal_type", ""),
subject=case.get("subject", ""),
subject_categories=case.get("subject_categories", []),
)
# Methodology guidance — tells block-yod HOW to reason (universal, not case-specific)
methodology_guidance = get_methodology_summary()
# Format prompt — per Anthropic long-context best practices:
# Place source documents FIRST (top of prompt), instructions LAST.
# "Queries at the end can improve response quality by up to 30%"
formatted_prompt = prompt_template.format(
case_context=case_context,
source_context=source_context,
claims_context=claims_context,
direction_context=direction_context,
plans_context=plans_context,
daphna_style_exemplars=daphna_style_exemplars,
case_law_citations=case_law_citations,
style_context=style_context,
discussion_context=discussion_context,
structure_guidance=structure_guidance,
content_checklist=content_checklist,
methodology_guidance=methodology_guidance,
appraiser_facts_context=appraiser_facts_context,
appraiser_conflicts_context=appraiser_conflicts_context,
post_hearing_context=post_hearing_context,
)
# source_context is already embedded inside formatted_prompt via {source_context} in the
# template. Do NOT prepend it again — doing so doubles the prompt size (was 465K chars).
prompt = formatted_prompt
if instructions:
prompt += f"\n\n## הנחיות נוספות:\n{instructions}"
# Block י requires approved direction
if block_id == "block-yod":
dir_doc = (decision or {}).get("direction_doc") or {}
if not dir_doc.get("approved"):
raise ValueError("לא ניתן לכתוב בלוק דיון ללא כיוון מאושר. הפעל brainstorm → approve_direction קודם.")
# Guard against context overflow before calling claude -p.
# Sonnet: 200K context → ~800K chars max; Opus: 200K context → same.
# In practice the CLI has crashed on prompts above ~400K chars, so use
# that as a conservative ceiling (well below the token limit).
_MAX_PROMPT_CHARS = 400_000
if len(prompt) > _MAX_PROMPT_CHARS:
raise RuntimeError(
f"Prompt too large for {block_id}: {len(prompt):,} chars "
f"(limit {_MAX_PROMPT_CHARS:,}). "
f"source_context: {len(source_context):,} chars. "
f"Reduce documents or call extract_appraiser_facts first."
)
# Call Claude via Claude Code session (no API)
model_key = block_cfg["model"]
timeout = claude_session.LONG_TIMEOUT if model_key == "opus" else claude_session.DEFAULT_TIMEOUT
content = await claude_session.query(prompt, timeout=timeout, tools="") # prose gen — no tool_use → no error_max_turns
sources = await _collect_block_sources(case_id, block_id)
sources["case_law_ids"] = _precedent_case_law_ids
result = _build_result(block_id, content, block_cfg)
result["sources"] = sources
return result
def _build_result(block_id: str, content: str, block_cfg: dict) -> dict:
word_count = len(content.split())
return {
"block_id": block_id,
"block_index": block_cfg["index"],
"title": block_cfg["title"],
"content": content,
"word_count": word_count,
"generation_type": block_cfg["gen_type"],
"model_used": block_cfg["model"],
"temperature": block_cfg["temp"],
}
async def _collect_block_sources(case_id: UUID, block_id: str) -> dict:
"""Deterministic source ids available to a block's generation (GAP-19).
document_ids: case documents matching the block's allowed doc-types.
claim_ids: extracted claims for the case. (case_law_ids are captured
separately from the precedent search inside write_block.)
"""
allowed = _BLOCK_DOC_TYPES.get(block_id, []) # [] = all docs; None = no source docs
if allowed is None:
docs = [] # mirror _build_source_context: this block consumes no raw source docs
else:
docs = await db.list_documents(case_id)
if allowed:
docs = [d for d in docs if d.get("doc_type") in allowed]
claims = await db.get_claims(case_id)
return {
"document_ids": [str(d["id"]) for d in docs],
"claim_ids": [str(c["id"]) for c in claims],
}
# ── Context builders ──────────────────────────────────────────────
def _build_case_context(case: dict, decision: dict | None) -> str:
outcome = canonical_outcome((decision or {}).get("outcome", ""))
outcome_heb = OUTCOME_LABELS_HE.get(outcome, "")
return f"""- מספר תיק: {case['case_number']}
- כותרת: {case.get('title', '')}
- עוררים: {', '.join(case.get('appellants', []))}
- משיבים: {', '.join(case.get('respondents', []))}
- נושא: {case.get('subject', '')}
- כתובת: {case.get('property_address', '')}
- סוג ערר: {case.get('appeal_type', '')}
- תוצאה: {outcome_heb}"""
# Which doc_types are relevant per block.
# None → skip source docs entirely (block uses other context, e.g. claims_context)
# [] → include all doc types (default for unspecified blocks)
# [..] → include only the listed doc_type values
_BLOCK_DOC_TYPES: dict[str, list[str] | None] = {
"block-he": None, # only case_context needed; no full docs
"block-vav": ["appeal", "protocol"], # כתב ערר + פרוטוקול ועדה
"block-zayin": None, # claims_context is sufficient
"block-chet": ["protocol"], # פרוטוקול + השלמות טיעון
"block-tet": ["appraisal"], # שומות בלבד
# block-yod, block-yod-alef, block-he etc. default → all docs
}
async def _build_source_context(case_id: UUID, block_id: str) -> str:
"""Get document texts for the block, filtered by relevance.
Per Anthropic best practices: send full source documents, not truncated excerpts.
Per-block filtering prevents context overflow on large cases (9+ docs).
"""
allowed = _BLOCK_DOC_TYPES.get(block_id, []) # [] sentinel = not in map → all docs
if allowed is None:
return "" # this block doesn't need raw source docs
docs = await db.list_documents(case_id)
context_parts = []
for doc in docs:
if allowed and doc["doc_type"] not in allowed:
continue
text = await db.get_document_text(UUID(doc["id"]))
if text:
context_parts.append(f"--- מסמך: {doc['title']} ({doc['doc_type']}) ---\n{text}")
return "\n\n".join(context_parts) if context_parts else "(אין מסמכים)"
async def _build_claims_context(case_id: UUID) -> str:
claims = await db.get_claims(case_id)
if not claims:
return "(לא חולצו טענות)"
# Filter out claims from block-zayin (decision summary) — use only
# claims extracted from original pleadings (appeal, response, etc.)
source_claims = [c for c in claims if c.get("source_document", "") != "block-zayin"]
if not source_claims:
# Fallback to all claims if no source claims exist
source_claims = claims
lines = []
current_role = ""
role_heb = {"appellant": "טענות העוררים", "respondent": "טענות המשיבים",
"committee": "עמדת הוועדה המקומית", "permit_applicant": "עמדת מבקשי ההיתר"}
claim_num = 0
for c in source_claims:
if c["party_role"] != current_role:
current_role = c["party_role"]
lines.append(f"\n### {role_heb.get(current_role, current_role)}")
claim_num += 1
lines.append(f"טענה #{claim_num}: {c['claim_text'][:400]}")
lines.append(f"\n**סה\"כ {claim_num} טענות. ענה על כל טענה מהותית; טענות [bundle] — אגד; טענות [skip] — ציון קצר בלבד.**")
return "\n".join(lines)
def _build_direction_context(decision: dict | None) -> str:
if not decision:
return "(לא הוגדר כיוון)"
dir_doc = decision.get("direction_doc") or {}
if not dir_doc.get("approved"):
return "(כיוון לא אושר)"
parts = []
outcome_heb = dir_doc.get("outcome_hebrew", "")
if outcome_heb:
parts.append(f"תוצאה: {outcome_heb}")
reasoning = dir_doc.get("reasoning", "")
if reasoning:
parts.append(f"נימוק: {reasoning}")
direction = dir_doc.get("selected_direction")
if direction:
parts.append(f"כיוון נבחר: {direction.get('name', '')}")
for r in direction.get("reasoning", []):
parts.append(f" - {r}")
for p in direction.get("precedents", []):
parts.append(f" פסיקה: {p}")
notes = dir_doc.get("additional_notes", "")
if notes:
parts.append(f"הערות: {notes}")
return "\n".join(parts) if parts else "(אין מסמך כיוון)"
async def _build_plans_context(case_id: UUID) -> str:
"""Get plan references from document metadata."""
docs = await db.list_documents(case_id)
plans = set()
for doc in docs:
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
refs = metadata.get("references", {})
for p in refs.get("plans", []):
plans.add(p.get("plan_name", ""))
if plans:
return "\n".join(f"- {p}" for p in sorted(plans) if p)
return "(לא זוהו תכניות)"
APPRAISER_SIDE_LABEL_HE = {
"committee": "שמאי הוועדה המקומית",
"appellant": "שמאי העורר",
"deciding": "שמאי מכריע",
"": "שמאי (לא תויג)",
}
# Sort key: committee → appellant → deciding → untagged. This matches the order
# used by db.detect_appraiser_conflicts so the deciding appraiser is last —
# i.e. the conclusion reads most naturally ("...and the deciding appraiser ruled...").
_SIDE_ORDER = {"committee": 1, "appellant": 2, "deciding": 3, "": 4}
def _side_label(side: str) -> str:
return APPRAISER_SIDE_LABEL_HE.get(side or "", APPRAISER_SIDE_LABEL_HE[""])
async def _build_appraiser_facts_context(case_id: UUID) -> str:
"""Group appraiser_facts by side (then name), list each appraiser's plans+permits."""
facts = await db.list_appraiser_facts(case_id)
if not facts:
return "(לא חולצו עובדות שמאיות. הרץ extract_appraiser_facts.)"
# (side, name) → {plan: [...], permit: [...]}
groups: dict[tuple[str, str], dict[str, list[dict]]] = {}
for f in facts:
key = (f.get("appraiser_side", "") or "", f["appraiser_name"])
bucket = groups.setdefault(key, {"plan": [], "permit": []})
bucket[f["fact_type"]].append(f)
ordered_keys = sorted(groups.keys(), key=lambda k: (_SIDE_ORDER.get(k[0], 9), k[1]))
lines: list[str] = []
for side, name in ordered_keys:
lines.append(f"\n### {_side_label(side)}{name}")
for label, key in (("תכניות", "plan"), ("היתרים", "permit")):
items = groups[(side, name)][key]
if not items:
continue
lines.append(f"**{label}:**")
for item in items:
details = item.get("details") or {}
ident = item["identifier"]
scope = (details.get("scope") or "").strip()
date_s = (details.get("date") or "").strip()
status = (details.get("status") or "").strip()
quote = (details.get("raw_quote") or "").strip()
bits = [ident]
if date_s:
bits.append(f"תאריך: {date_s}")
if status:
bits.append(f"סטטוס: {status}")
if scope:
bits.append(f"היקף: {scope}")
line = " | ".join(bits)
if quote:
line += f"\n ציטוט: \"{quote[:200]}\""
lines.append(f"- {line}")
return "\n".join(lines)
async def _build_appraiser_conflicts_context(case_id: UUID) -> str:
"""Render conflict groups so the prompt can quote them in the body.
Entries arrive pre-ordered from the DB by side (committee→appellant→deciding).
When a deciding appraiser exists, the prompt must treat their view as the
governing factual determination.
"""
conflicts = await db.detect_appraiser_conflicts(case_id)
if not conflicts:
return "(אין סתירות בין שמאים)"
type_label = {"plan": "תכנית", "permit": "היתר"}
lines: list[str] = []
for c in conflicts:
has_deciding = any(e.get("appraiser_side") == "deciding" for e in c["entries"])
header = f"\n### סתירה — {type_label.get(c['fact_type'], c['fact_type'])}: {c['identifier']}"
if has_deciding:
header += " _(יש שמאי מכריע — עמדתו קובעת)_"
lines.append(header)
for entry in c["entries"]:
side = entry.get("appraiser_side", "") or ""
details = entry.get("details") or {}
scope = (details.get("scope") or "").strip()
status = (details.get("status") or "").strip()
quote = (details.get("raw_quote") or "").strip()
marker = "" if side == "deciding" else ""
parts = [f"**{marker}{_side_label(side)}{entry['appraiser_name']}**"]
if status:
parts.append(f"סטטוס: {status}")
if scope:
parts.append(f"היקף: {scope}")
line = " | ".join(parts)
if quote:
line += f"\n ציטוט: \"{quote[:200]}\""
lines.append(f"- {line}")
return "\n".join(lines)
async def _build_post_hearing_context(case_id: UUID) -> str:
"""List documents flagged as submitted after the hearing.
Convention: documents.metadata.is_post_hearing == True.
"""
docs = await db.list_documents(case_id)
items: list[dict] = []
for d in docs:
meta = d.get("metadata") or {}
if isinstance(meta, str):
meta = json.loads(meta)
if not meta.get("is_post_hearing"):
continue
items.append({
"title": d.get("title", ""),
"doc_type": d.get("doc_type", ""),
"submitted_on": meta.get("submitted_on", ""),
"kind": meta.get("post_hearing_kind", ""), # "supplementary_brief" | "settlement_proposal" | ...
})
if not items:
return "(לא הוגשו מסמכים לאחר הדיון, או שהם לא סומנו כ-post_hearing)"
lines: list[str] = []
for it in items:
meta_bits = []
if it["submitted_on"]:
meta_bits.append(f"הוגש: {it['submitted_on']}")
if it["kind"]:
meta_bits.append(f"סוג: {it['kind']}")
if it["doc_type"]:
meta_bits.append(f"doc_type={it['doc_type']}")
meta_str = f" ({', '.join(meta_bits)})" if meta_bits else ""
lines.append(f"- {it['title']}{meta_str}")
return "\n".join(lines)
async def _build_precedents_context(
case_id: UUID, block_id: str,
) -> tuple[str, str, list[str]]:
"""Two SEPARATE streams (INV-LRN5 — keep style apart from substance):
1. style_exemplars — Dafna's own block-level paragraphs (HOW she writes; structure/voice).
2. case_law_citations — precedent case-law (substantive material to quote).
Returns (style_exemplars, case_law_citations, case_law_ids).
"""
style_parts: list[str] = []
caselaw_parts: list[str] = []
case_law_ids: list[str] = []
# block → golden-ratio section, for targeted exemplar retrieval (T2)
_BLOCK_SECTION = {
"block-vav": "background", "block-zayin": "claims",
"block-yod": "discussion", "block-yod-alef": "summary",
}
try:
case = await db.get_case(case_id)
case_number = case.get("case_number", "") if case else ""
subject = case.get("subject", "") if case else ""
practice_area = case.get("practice_area", "") if case else ""
decision = await db.get_decision_by_case(case_id)
outcome = (decision or {}).get("outcome", "")
query = f"דיון משפטי בנושא {subject}" if subject else "דיון משפטי ועדת ערר"
query_emb = await embeddings.embed_query(query)
section = _BLOCK_SECTION.get(block_id)
# Stream 1a (PRIMARY): Dafna's own block-level prose from her corpus
# (style_exemplars) — matched by section + outcome + practice_area (T2/T3).
if section:
exemplars = await db.search_style_exemplars(
query_embedding=query_emb, section=section,
outcome=outcome or None, practice_area=practice_area or None, limit=6,
)
exemplars = [e for e in exemplars if e.get("decision_number", "") != case_number]
for e in exemplars[:4]:
style_parts.append(
f"[דוגמת-סגנון (מבנה/קול בלבד — התאם, אל תעתיק תוכן) — "
f"{e.get('decision_number', '?')}, {section}, "
f"outcome={e.get('outcome') or ''}]\n{e['paragraph_text'][:1100]}"
)
# Stream 1b: paragraphs from pipeline cases (legacy path; may be empty)
para_results = await db.search_similar_paragraphs(
query_embedding=query_emb, limit=10, block_type="block-yod",
)
para_results = [r for r in para_results if r.get("case_number", "") != case_number]
for r in para_results[:2]:
style_parts.append(
f"[דוגמת-סגנון — החלטת {r.get('case_number', '?')} "
f"{r.get('case_title', '')}, בלוק {r.get('block_type', '')}]\n{r['content'][:500]}"
)
# Stream 2: case_law_embeddings — substantive precedent (citations)
pool = await db.get_pool()
async with pool.acquire() as conn:
caselaw_rows = await conn.fetch(
"""SELECT cl.id, cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,
1 - (cle.embedding <=> $1) AS score
FROM case_law_embeddings cle
JOIN case_law cl ON cl.id = cle.case_law_id
ORDER BY cle.embedding <=> $1
LIMIT 5""",
query_emb,
)
for r in caselaw_rows[:3]:
case_law_ids.append(str(r["id"]))
text = r["key_quote"] or r["summary"] or ""
if text:
caselaw_parts.append(
f"[פסיקה: {r['case_number']} {r['case_name']} ({r.get('court', '')})] "
f"score={r['score']:.3f}\n{text[:400]}"
)
except Exception as e:
logger.warning("Failed to fetch precedents: %s", e)
return (
"\n\n".join(style_parts) if style_parts else "(אין דוגמאות-סגנון)",
"\n\n".join(caselaw_parts) if caselaw_parts else "(אין פסיקה רלוונטית)",
case_law_ids,
)
# Cache for the abstract voice profile (read once per process).
_VOICE_FINGERPRINT_CACHE: str | None = None
# Style-acquisition policy (INV-LRN5): how to USE the style material below.
_COPY_POLICY = """## מדיניות-סגנון (איך להשתמש בחומר שלהלן) — חובה:
**היעד: לכתוב בקול ובשיטה של דפנה — לא להעתיק.** הפרופיל שלהלן הוא ההכללה של *איך* דפנה כותבת; הַחֵל אותו על העובדות של התיק שלפניך.
- **תוכן קבוע/נוסחאי** (פתיח דוקטרינלי, תבנית-סיום, ביטויי-מעבר) → מותר להשתמש כלשונו.
- **ניתוח/טענות ספציפיים** → הכלל את הדפוס והתאם לתיק; אל תעתיק ניסוח מתיק אחר.
- **מהות משפטית (הלכה/עובדה/תקדים) מתיק אחר** → אסור לגרור לכאן; המהות באה מחומרי-המקור והפסיקה של *התיק הזה* בלבד.
"""
def _load_voice_fingerprint() -> str:
"""Load the abstract authorial-style profile (daphna-voice-fingerprint.md).
This is the PRIMARY style channel (Authorial Style Profiling): the generalized
'how Dafna writes', injected so the writer adapts it rather than copying exemplars.
Read-only consumption of a learning artifact (Writing↔Acquisition separation).
"""
global _VOICE_FINGERPRINT_CACHE
if _VOICE_FINGERPRINT_CACHE is not None:
return _VOICE_FINGERPRINT_CACHE
try:
path = config.DATA_DIR.parent / "docs" / "daphna-voice-fingerprint.md"
_VOICE_FINGERPRINT_CACHE = path.read_text(encoding="utf-8")
except Exception as e:
logger.warning("voice-fingerprint not loaded: %s", e)
_VOICE_FINGERPRINT_CACHE = ""
return _VOICE_FINGERPRINT_CACHE
async def _build_style_context(practice_area: str = "") -> str:
"""Build comprehensive style guide: abstract voice profile (primary) +
SKILL.md rules + DB patterns + accumulated chair learnings.
Per Anthropic: explicit style instructions reduce generic output. The voice
fingerprint is the primary abstract-profile channel (T0 / INV-LRN4-5).
Accumulated learnings (T15) — the chair's /methodology edits and /training
decision_lessons — are appended LAST and marked authoritative, so everything
we have learned to date reaches the writer (not just hardcoded defaults).
"""
lines = []
# Copy-policy first, then the abstract voice profile (the PRIMARY channel).
lines.append(_COPY_POLICY)
fingerprint = _load_voice_fingerprint()
if fingerprint:
lines.append("## פרופיל-הקול של דפנה (טביעת-אצבע — המנגנון המרכזי):\n")
lines.append(fingerprint)
# Core style rules (from SKILL.md analysis)
lines.append("""## כללי סגנון דפנה תמיר — חובה:
### טון:
- ערר רישוי (1xxx): חם יחסית, עם אלמנטים אנושיים
- ערר השבחה (8xxx): קר, יבש, מקצועי
- גוף ראשון רבים: "אנו סבורים", "מצאנו כי", "לדעתנו"
- ישיר ובהיר — לא אקדמי ולא מסורבל
### ביטויים ייחודיים (חובה להשתמש):
- "לפנינו..." (פתיחה)
- "כידוע..." (הצגת עקרון ידוע)
- "ברי כי..." / "ודוק..." (הדגשה)
- "אין בידנו לקבל" (דחיית טענה)
- "בטענה זו מצאנו טעם" (קבלת טענה)
- "יחד עם זאת" (מעבר לאיזון)
- "למעלה מן הצורך" / "נבקש שלא לצאת בחסר" (הרחבה)
- "הדברים מתחדדים שעה ש..." (חידוד)
- "מחד... מאידך... על כן..." (איזון לפני הכרעה)
- "לאור כל האמור לעיל" (סיכום)
- "ניתנה פה אחד היום" (סיום)
### מבנה דיון:
- אסה רציפה ללא כותרות משנה (חריג: נושאים נפרדים לחלוטין)
- מסקנה בפתיחה, לא בסוף
- מעברים טקסטואליים, לא כותרות
- ניטרול טענות חלשות לפני ניתוח מעמיק
- ציטוטי פסיקה כבלוקים מוגדלים
### טענות צדדים:
- עוררים: "העוררים טוענים כי...", "לטענתם...", "עוד ציינו כי..."
- ועדה: "הוועדה המקומית הציגה/הבהירה/הוסיפה כי..."
- מבקשי היתר: "מבקשי ההיתר דוחים מכל וכל...", "לטענתם...", "מבקשי ההיתר מציינים כי..."
""")
# DB patterns (actual examples from Dafna's decisions)
patterns = await db.get_style_patterns()
if patterns:
lines.append("### דפוסים שחולצו מהחלטות קודמות:")
grouped: dict[str, list] = {}
for p in patterns:
grouped.setdefault(p["pattern_type"], []).append(p)
type_names = {
"opening_formula": "פתיחה",
"transition": "מעברים",
"characteristic_phrase": "ביטויים אופייניים",
"closing_formula": "סיום",
"citation_style": "ציטוט",
}
for ptype in ["characteristic_phrase", "transition", "opening_formula", "closing_formula"]:
items = grouped.get(ptype, [])
if items:
lines.append(f"\n**{type_names.get(ptype, ptype)}:**")
for item in items[:8]:
lines.append(f"- {item['pattern_text']}")
# ── למידה מצטברת (T15) — עריכות היו"ר ב-/methodology + לקחי /training ──
# גובר על ברירות-המחדל לעיל. כך כל מה שלמדנו עד היום מגיע לכותב.
learned: list[str] = []
try:
for cat, label in (
("golden_ratios", "יחסי-זהב (אחוזי-סעיפים)"),
("discussion_rules", "כללי-דיון"),
("content_checklists", "צ׳קליסטים"),
("transition_phrases", "ביטויי-מעבר"),
("anti_patterns", "אנטי-דפוסים (להימנע)"),
):
ov = await db.get_methodology_overrides(cat)
if ov:
learned.append(f"\n**{label} — ערכי היו\"ר (גוברים על ברירת-המחדל):**")
for k, v in ov.items():
learned.append(f"- {k}: {json.dumps(v, ensure_ascii=False)}")
except Exception as e:
logger.warning("methodology overrides not loaded: %s", e)
try:
lessons = await db.get_recent_decision_lessons(limit=15, practice_area=practice_area)
if lessons:
learned.append("\n**לקחים מהחלטות קודמות (decision_lessons):**")
for ls in lessons:
src = ls.get("decision_number") or ls.get("source") or ""
learned.append(f"- [{ls.get('category', '')}] {ls['lesson_text']}" + (f" ({src})" if src else ""))
except Exception as e:
logger.warning("decision_lessons not loaded: %s", e)
if learned:
lines.append(
"\n## ⭐ למידה מצטברת — חובה, גובר על כל ברירת-מחדל לעיל "
"(עריכות היו\"ר ב-/methodology + לקחי /training):"
)
lines.extend(learned)
return "\n".join(lines)
async def _build_previous_blocks_context(case_id: UUID, decision: dict | None) -> str:
"""Get content of previously written blocks."""
if not decision:
return "(אין בלוקים קודמים)"
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, title, content, word_count
FROM decision_blocks
WHERE decision_id = $1 AND word_count > 0
ORDER BY block_index""",
UUID(decision["id"]),
)
if not rows:
return "(אין בלוקים קודמים)"
parts = []
for r in rows:
content = r["content"][:2000]
parts.append(f"### {r['title']} ({r['block_id']})\n{content}")
return "\n\n".join(parts)
# ── Context-only mode (for Claude Code to write) ─────────────────
async def get_block_context(case_id: UUID, block_id: str, instructions: str = "") -> dict:
"""Return full context package for a block WITHOUT calling Claude API.
Claude Code (or any external writer) uses this context to write the block,
then saves it via save_block_content.
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
case = await db.get_case(case_id)
if not case:
raise ValueError(f"Case {case_id} not found")
decision = await db.get_decision_by_case(case_id)
# Template blocks — return content directly
if block_id in TEMPLATE_WRITERS:
content = TEMPLATE_WRITERS[block_id](case, decision)
return {
"block_id": block_id,
"title": block_cfg["title"],
"mode": "template",
"content": content,
}
# Build all context components
prompt_template = BLOCK_PROMPTS.get(block_id, "")
case_context = _build_case_context(case, decision)
source_context = await _build_source_context(case_id, block_id)
claims_context = await _build_claims_context(case_id)
direction_context = _build_direction_context(decision)
plans_context = await _build_plans_context(case_id)
daphna_style_exemplars, case_law_citations, _ = (
await _build_precedents_context(case_id, block_id)
)
style_context = await _build_style_context(case.get("practice_area", ""))
discussion_context = await _build_previous_blocks_context(case_id, decision)
appraiser_facts_context = await _build_appraiser_facts_context(case_id)
appraiser_conflicts_context = await _build_appraiser_conflicts_context(case_id)
post_hearing_context = await _build_post_hearing_context(case_id)
outcome = canonical_outcome((decision or {}).get("outcome", "rejection"))
structure_guidance = STRUCTURE_GUIDANCE.get(outcome, "")
if case.get("practice_area") == "betterment_levy":
structure_guidance = (
structure_guidance + " | היטל השבחה: "
+ " ".join(PRACTICE_AREA_OVERRIDES["betterment_levy"]["discussion_rules"])
).strip()
# Content checklist + methodology for block-yod
content_checklist = ""
methodology_guidance = ""
if block_id == "block-yod":
content_checklist = get_content_checklist(
appeal_type=case.get("appeal_type", ""),
subject=case.get("subject", ""),
subject_categories=case.get("subject_categories", []),
)
methodology_guidance = get_methodology_summary()
formatted_prompt = prompt_template.format(
case_context=case_context,
source_context=source_context,
claims_context=claims_context,
direction_context=direction_context,
plans_context=plans_context,
daphna_style_exemplars=daphna_style_exemplars,
case_law_citations=case_law_citations,
style_context=style_context,
discussion_context=discussion_context,
structure_guidance=structure_guidance,
content_checklist=content_checklist,
methodology_guidance=methodology_guidance,
appraiser_facts_context=appraiser_facts_context,
appraiser_conflicts_context=appraiser_conflicts_context,
post_hearing_context=post_hearing_context,
)
if instructions:
formatted_prompt += f"\n\n## הנחיות נוספות:\n{instructions}"
# Block י requires approved direction
if block_id == "block-yod":
dir_doc = (decision or {}).get("direction_doc") or {}
if not dir_doc.get("approved"):
raise ValueError("לא ניתן לכתוב בלוק דיון ללא כיוון מאושר.")
return {
"block_id": block_id,
"title": block_cfg["title"],
"mode": "context",
"prompt": formatted_prompt,
"source_documents": source_context,
"claims": claims_context,
"direction": direction_context,
"precedents": case_law_citations,
"style_exemplars": daphna_style_exemplars,
"style_guide": style_context,
"previous_blocks": discussion_context,
}
async def save_block_content(case_id: UUID, block_id: str, content: str) -> dict:
"""Save block content written by Claude Code (or any external writer).
Saves to DB and also writes/updates the draft file on disk.
"""
if block_id not in BLOCK_CONFIG:
raise ValueError(f"Unknown block: {block_id}")
block_cfg = BLOCK_CONFIG[block_id]
decision = await db.get_decision_by_case(case_id)
if not decision:
decision = await db.create_decision(case_id=case_id)
result = _build_result(block_id, content, block_cfg)
result["generation_type"] = "claude-code"
result["model_used"] = "claude-code"
await store_block(UUID(decision["id"]), result) # store_block syncs the file (#35)
await db.mark_blocks_stale(case_id, False)
return result
async def _update_draft_file(decision_id: UUID) -> None:
"""Rebuild drafts/decision.md from all blocks in DB — the single
regenerate-draft hook (lessons #35 / GAP-88). Called after EVERY
decision_blocks mutation (store_block, renumber) so the on-disk file never
drifts from the DB. legal-qa validates against the DB; export and the chair
read the file — keeping them identical kills the "QA fails twice on the same
already-fixed issue" loop (CMPA-62). Resolves case from decision_id so no
caller has to thread case_id through."""
pool = await db.get_pool()
async with pool.acquire() as conn:
case_row = await conn.fetchrow(
"SELECT c.case_number FROM decisions d JOIN cases c ON c.id = d.case_id "
"WHERE d.id = $1",
decision_id,
)
if not case_row:
return
rows = await conn.fetch(
"SELECT content FROM decision_blocks WHERE decision_id = $1 AND content != '' ORDER BY block_index",
decision_id,
)
draft_dir = config.find_case_dir(case_row["case_number"]) / "drafts"
draft_dir.mkdir(parents=True, exist_ok=True)
draft_path = draft_dir / "decision.md"
draft_text = "\n\n".join(row["content"] for row in rows if row["content"])
draft_path.write_text(draft_text, encoding="utf-8") # noqa: STG1 — sealed below
try:
_dkey = draft_path.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
await storage.mirror(_dkey, draft_text.encode("utf-8"), bucket=storage.Bucket.DOCUMENTS)
except ValueError:
pass
logger.info("Draft file synced: %s (%d blocks)", draft_path, len(rows))
# ── Renumbering ───────────────────────────────────────────────────
async def renumber_all_blocks(decision_id: UUID) -> dict:
"""מספור רציף מחדש של כל הבלוקים בהחלטה.
עובר על כל הבלוקים לפי סדר, ומחליף את כל המספורים
(1. 2. 3. או **1.** **2.**) לרצף אחד רציף.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT block_id, block_index, content, word_count
FROM decision_blocks WHERE decision_id = $1
ORDER BY block_index""",
decision_id,
)
current_num = 1
updated = 0
# Blocks that shouldn't be numbered
skip_blocks = {"block-alef", "block-bet", "block-gimel", "block-dalet", "block-yod-bet"}
for row in rows:
if row["block_id"] in skip_blocks or not row["content"]:
continue
content = row["content"]
# Replace numbered paragraphs: "N." or "**N.**" or "**N.**" at line start
def replace_num(match):
nonlocal current_num
prefix = match.group(1) or "" # bold markers
suffix = match.group(3) or "" # bold markers
result = f"{prefix}{current_num}{suffix}"
current_num += 1
return result
new_content = re.sub(
r'^(\*\*)?(\d+)(\.?\*?\*?\.)',
replace_num,
content,
flags=re.MULTILINE,
)
if new_content != content:
async with pool.acquire() as conn:
await conn.execute(
"UPDATE decision_blocks SET content = $1, updated_at = now() WHERE decision_id = $2 AND block_id = $3",
new_content, decision_id, row["block_id"],
)
updated += 1
# #35 — renumber mutates content via raw UPDATE (bypasses store_block), so
# sync the draft file here too, otherwise the file keeps stale numbering.
if updated:
await _update_draft_file(decision_id)
return {"total_paragraphs": current_num - 1, "blocks_updated": updated}
# ── Store block ───────────────────────────────────────────────────
async def store_block(decision_id: UUID, block_result: dict) -> None:
"""שמירת בלוק ב-DB (upsert)."""
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO decision_blocks
(decision_id, block_id, block_index, title, content, word_count,
generation_type, model_used, temperature, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'draft')
ON CONFLICT (decision_id, block_id) DO UPDATE SET
content = EXCLUDED.content,
word_count = EXCLUDED.word_count,
generation_type = EXCLUDED.generation_type,
model_used = EXCLUDED.model_used,
temperature = EXCLUDED.temperature,
status = 'draft',
updated_at = now()""",
decision_id,
block_result["block_id"],
block_result["block_index"],
block_result["title"],
block_result["content"],
block_result["word_count"],
block_result["generation_type"],
block_result["model_used"],
block_result["temperature"],
)
# #35 — regenerate the on-disk draft on every persist so DB and file stay
# identical (legal-qa reads DB; export/chair read the file).
await _update_draft_file(decision_id)
async def write_and_store_block(
case_id: UUID,
block_id: str,
instructions: str = "",
) -> dict:
"""כתיבת בלוק ושמירה ב-DB."""
decision = await db.get_decision_by_case(case_id)
if not decision:
# Create decision if not exists
decision = await db.create_decision(case_id=case_id)
result = await write_block(case_id, block_id, instructions)
await store_block(UUID(decision["id"]), result)
await audit.log_action_safe(
"write_block", case_id=case_id,
details={
"decision_id": str(decision["id"]),
"block_id": block_id,
"model_used": result.get("model_used"),
"generation_type": result.get("generation_type"),
"sources": result.get("sources", {}),
},
)
await db.mark_blocks_stale(case_id, False)
return result