כש-extract_plans מוצא מספר-תכנית עם תוקף-חסר (תאריך-רשומות / י"פ),
upsert_candidates ממלא את החוסר מ-מנהל-התכנון לפני ה-upsert. הרשומה
עדיין נכנסת pending_review — ההעשרה משנה את המועמד, לא את שער-היו"ר.
שמרני בכוונה:
- ממלא רק שדות-חסרים — לא דורס ערכים מעוגני-תיק (display_name/purpose
מהחילוץ נשמרים).
- מגודר לפורמט-mavat מודרני (\d{2,4}-\d{6,8}); מספרים-ישנים (מי/820,
תמ"א 38) מדולגים (לא יבזבזו השקת-דפדפן).
- תקרה PLAN_ENRICH_MAX_PER_CALL=8 (מתועד אם נחצה — בלי silent-cap).
- fail-soft: גשר-למטה / לא-נמצא / חסום → המועמד נשאר כפי-שחולץ (לוג,
לא בליעה שקטה).
- דגל-כיבוי PLAN_ENRICH_FROM_MAVAT=0.
- מקור-ההעשרה מסומן ב-model_used="claude_local+mavat".
INV-AH: ערך-תוקף שנמשך נושא מקור (mavat); שדה-חסר נשאר ריק. G10: שער-
היו"ר נשמר. G2: מרחיב את plans_fetch (#292), לא מסלול מקביל.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
280 lines
12 KiB
Python
280 lines
12 KiB
Python
"""חילוץ מובנה של תכניות בניין-עיר ותוקפן לתוך מרשם-התכניות (טבלת plans).
|
||
|
||
תכלית: לבנות SSOT קנוני לתכניות שחוזרות בין תיקים — מספר-תכנית מנורמל, תוקף
|
||
(פרסום למתן תוקף ברשומות + מס' ילקוט-הפרסומים), ומשפט-ייעוד אחד — כדי שבלוק ט
|
||
יצטט אותן בנוסח אחיד ודטרמיניסטי (format_plan_citation) במקום לגזור מחדש מהשומות
|
||
בכל תיק (G2).
|
||
|
||
חילוץ עובדתי בלבד. הרשומות נכנסות review_status='pending_review' וממתינות
|
||
לאישור-יו"ר (INV-DM5/G10) לפני שישמשו בכתיבה. הקריאות ל-LLM מתבצעות דרך
|
||
claude_session המקומי בלבד (כמו שאר המחלצים) — לא Anthropic SDK ישיר.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import re
|
||
from uuid import UUID
|
||
|
||
from legal_mcp.services import claude_session, db
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Descriptive provenance tag for INV-DM4 (we call the local claude CLI session,
|
||
# not a pinned model id — the session model is whatever is configured).
|
||
MODEL_TAG = "claude_local"
|
||
|
||
# ── mavat auto-enrichment (Phase C trigger 2) ──────────────────────────────────
|
||
# When an extracted candidate is missing its validity (gazette_date / yalkut), we
|
||
# fill the gaps from the official source (mavat) via the host bridge. Conservative
|
||
# by design: only modern numeric plan numbers resolve on mavat search, each fetch
|
||
# drives a real browser (~30-60s, serial), so we gate by format + cap per call and
|
||
# fail soft. Set PLAN_ENRICH_FROM_MAVAT=0 to disable.
|
||
_ENRICH_ENABLED = os.environ.get("PLAN_ENRICH_FROM_MAVAT", "1").strip() not in ("0", "false", "")
|
||
_ENRICH_MAX_PER_CALL = int(os.environ.get("PLAN_ENRICH_MAX_PER_CALL", "8"))
|
||
# mavat search resolves the modern "NN-NNNNNNN" identifiers; legacy forms
|
||
# (מי/820, 5166/ב, תמ"א 38) don't, so don't waste a browser launch on them.
|
||
_MAVAT_NUM_RE = re.compile(r"^\d{2,4}-\d{6,8}$")
|
||
|
||
|
||
EXTRACT_PLANS_PROMPT = """אתה מחלץ מידע עובדתי על תכניות בניין-עיר (תב"ע) עבור מרשם-תכניות של ועדת ערר.
|
||
|
||
תפקידך: לחלץ כל תכנית שמצוין לגביה **תוקף** — מתי פורסמה למתן תוקף (ברשומות / בילקוט הפרסומים) — או ייעוד ברור.
|
||
|
||
## כללים
|
||
- עובדתי בלבד. אל תסיק, אל תפרש, ואל תמציא תאריך שאינו כתוב במפורש.
|
||
- חלץ רק תכניות שמופיע לגביהן מידע-תוקף או ייעוד ברור. דלג על אזכור-אגב ללא פרטים.
|
||
- gazette_date: תאריך הפרסום למתן תוקף, בפורמט ISO (YYYY-MM-DD). אם לא צוין תאריך — השאר "".
|
||
- yalkut_number: מספר ילקוט הפרסומים / י"פ אם צוין (למשל "5965"). אחרת "".
|
||
- display_name: שם-התכנית כפי שמקובל לכתוב בהחלטה, כולל המילה "תכנית" (למשל "תכנית מי/820").
|
||
- plan_number: מזהה-התכנית בלבד, ללא המילה "תכנית" (למשל "מי/820", "5166/ב", "152-0132902", "תמ\\"א 38").
|
||
- plan_type: אחד מ- ארצית / מחוזית / מקומית / מפורטת / כוללנית, אם ניתן לקבוע מהטקסט. אחרת "".
|
||
- purpose: משפט-ייעוד אחד תמציתי (מה התכנית עושה/משנה/קובעת). אחרת "".
|
||
- raw_quote: ציטוט מילולי של המשפט שממנו חולץ התוקף, עד 200 תווים.
|
||
|
||
## פלט
|
||
החזר JSON array בלבד — ללא markdown, ללא הסברים:
|
||
[
|
||
{
|
||
"plan_number": "מי/820",
|
||
"display_name": "תכנית מי/820",
|
||
"plan_type": "מקומית",
|
||
"gazette_date": "2001-08-09",
|
||
"yalkut_number": "",
|
||
"purpose": "משנה את הוראות תכנית מי/200 ומרחיבה את השימושים המותרים באזור חקלאי",
|
||
"raw_quote": "תוכנית מי/820 ... פורסמה למתן תוקף ביום 9.8.2001"
|
||
}
|
||
]
|
||
|
||
אם אין תכניות עם מידע-תוקף/ייעוד — החזר [].
|
||
"""
|
||
|
||
|
||
def _chunk_text(text: str, max_chars: int = 25000) -> list[str]:
|
||
"""Split a long document at paragraph boundaries (mirrors appraiser extractor)."""
|
||
if len(text) <= max_chars:
|
||
return [text]
|
||
chunks: list[str] = []
|
||
pos = 0
|
||
while pos < len(text):
|
||
end = min(pos + max_chars, len(text))
|
||
if end < len(text):
|
||
break_pos = text.rfind("\n\n", pos, end)
|
||
if break_pos > pos + max_chars // 2:
|
||
end = break_pos
|
||
chunks.append(text[pos:end])
|
||
pos = end
|
||
return chunks
|
||
|
||
|
||
async def extract_plans_from_text(text: str) -> list[dict]:
|
||
"""Extract plan candidates from arbitrary text via the local claude session.
|
||
|
||
Returns a list of normalized candidate dicts (not yet persisted). Factual only.
|
||
"""
|
||
candidates: list[dict] = []
|
||
chunks = _chunk_text(text)
|
||
for i, chunk in enumerate(chunks):
|
||
chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else ""
|
||
prompt = (
|
||
f"{EXTRACT_PLANS_PROMPT}\n\n"
|
||
f"--- תחילת מסמך{chunk_label} ---\n{chunk}\n--- סוף מסמך ---"
|
||
)
|
||
result = await claude_session.query_json(prompt, tools="") # no tool_use
|
||
if not isinstance(result, list):
|
||
logger.warning(
|
||
"extract_plans_from_text: chunk %d returned non-list (%s)",
|
||
i, type(result).__name__,
|
||
)
|
||
continue
|
||
for item in result:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
num = (item.get("plan_number") or "").strip()
|
||
if not num:
|
||
continue
|
||
candidates.append({
|
||
"plan_number": num,
|
||
"display_name": (item.get("display_name") or "").strip(),
|
||
"plan_type": (item.get("plan_type") or "").strip(),
|
||
"gazette_date": (item.get("gazette_date") or "").strip(),
|
||
"yalkut_number": (item.get("yalkut_number") or "").strip(),
|
||
"purpose": (item.get("purpose") or "").strip(),
|
||
"raw_quote": (item.get("raw_quote") or "").strip(),
|
||
})
|
||
return candidates
|
||
|
||
|
||
def _needs_enrichment(c: dict) -> bool:
|
||
"""A candidate is worth enriching iff its validity is incomplete AND its
|
||
number is a mavat-resolvable modern identifier."""
|
||
if not (_ENRICH_ENABLED and _MAVAT_NUM_RE.match((c.get("plan_number") or "").strip())):
|
||
return False
|
||
return not (c.get("gazette_date") and c.get("yalkut_number"))
|
||
|
||
|
||
async def _enrich_from_mavat(c: dict) -> tuple[dict, bool]:
|
||
"""Fill a candidate's MISSING fields from mavat (never override case-grounded
|
||
values). Returns (candidate, enriched?). Fails soft — a bridge-down / not-found
|
||
/ blocked fetch leaves the candidate untouched (logged, never swallowed)."""
|
||
from legal_mcp.services import plans_fetch
|
||
|
||
num = c["plan_number"].strip()
|
||
try:
|
||
fetched = await plans_fetch.fetch_plan(num)
|
||
except plans_fetch.PlanFetchUnavailable as e:
|
||
logger.info("plan-enrich: bridge unavailable for %s — %s", num, e)
|
||
return c, False
|
||
except plans_fetch.PlanFetchError as e:
|
||
logger.info("plan-enrich: mavat had no usable result for %s — %s", num, e)
|
||
return c, False
|
||
except Exception as e: # noqa: BLE001 — never let enrichment break extraction
|
||
logger.warning("plan-enrich: unexpected error for %s — %s", num, e)
|
||
return c, False
|
||
|
||
enriched = dict(c)
|
||
filled: list[str] = []
|
||
for f in ("gazette_date", "yalkut_number", "display_name", "plan_type", "purpose"):
|
||
if not enriched.get(f) and fetched.get(f):
|
||
enriched[f] = fetched[f]
|
||
filled.append(f)
|
||
if filled:
|
||
logger.info("plan-enrich: %s filled %s from mavat (%s)",
|
||
num, ",".join(filled), fetched.get("source_url", ""))
|
||
return enriched, True
|
||
return c, False
|
||
|
||
|
||
async def upsert_candidates(
|
||
candidates: list[dict],
|
||
*,
|
||
source_case_number: str = "",
|
||
source_document_id: UUID | None = None,
|
||
model_used: str = MODEL_TAG,
|
||
enrich: bool = True,
|
||
) -> list[dict]:
|
||
"""Upsert extracted candidates into the registry as pending_review (G10).
|
||
|
||
When ``enrich`` (default) and a candidate's validity is incomplete, its
|
||
missing fields are pulled from mavat first (capped per call). The row still
|
||
enters pending_review — enrichment changes the candidate, not the chair gate.
|
||
"""
|
||
out: list[dict] = []
|
||
enriched_count = 0
|
||
for c in candidates:
|
||
used = model_used
|
||
if enrich and enriched_count < _ENRICH_MAX_PER_CALL and _needs_enrichment(c):
|
||
c, did = await _enrich_from_mavat(c)
|
||
if did:
|
||
enriched_count += 1
|
||
used = f"{model_used}+mavat"
|
||
try:
|
||
plan = await db.upsert_plan(
|
||
plan_number=c["plan_number"],
|
||
display_name=c.get("display_name", ""),
|
||
plan_type=c.get("plan_type", ""),
|
||
gazette_date=c.get("gazette_date") or None,
|
||
yalkut_number=c.get("yalkut_number", ""),
|
||
purpose=c.get("purpose", ""),
|
||
review_status="pending_review",
|
||
source_case_number=source_case_number,
|
||
source_document_id=source_document_id,
|
||
model_used=used,
|
||
)
|
||
out.append(plan)
|
||
except ValueError as e:
|
||
# Don't swallow — surface the bad candidate so it isn't silently dropped.
|
||
logger.warning("upsert_candidates: skipped %r — %s", c.get("plan_number"), e)
|
||
if enrich and enriched_count >= _ENRICH_MAX_PER_CALL:
|
||
logger.warning(
|
||
"plan-enrich: hit the per-call cap (%d) — remaining candidates kept "
|
||
"as-extracted (no silent truncation; raise PLAN_ENRICH_MAX_PER_CALL).",
|
||
_ENRICH_MAX_PER_CALL,
|
||
)
|
||
return out
|
||
|
||
|
||
async def extract_plans_for_case(case_id: UUID) -> dict:
|
||
"""Extract plan candidates from every document with text in the case.
|
||
|
||
Upserts them into the registry as pending_review. Thorough by design (we do not
|
||
pre-filter by doc_type — a plan's validity can be cited anywhere). Returns a
|
||
summary for serialization back to the caller.
|
||
"""
|
||
case = await db.get_case(case_id)
|
||
source_case_number = (case or {}).get("case_number", "") or ""
|
||
docs = await db.list_documents(case_id)
|
||
|
||
by_doc: list[dict] = []
|
||
seen_numbers: dict[str, dict] = {}
|
||
total_candidates = 0
|
||
for doc in docs:
|
||
text = await db.get_document_text(UUID(doc["id"]))
|
||
if not text:
|
||
continue
|
||
try:
|
||
cands = await extract_plans_from_text(text)
|
||
except Exception as e: # noqa: BLE001 — record, don't swallow
|
||
logger.exception("extract_plans_for_case: failed on doc %s", doc["id"])
|
||
by_doc.append({
|
||
"document_id": doc["id"], "title": doc.get("title", ""),
|
||
"status": "error", "error": str(e), "candidates": 0,
|
||
})
|
||
continue
|
||
plans = await upsert_candidates(
|
||
cands,
|
||
source_case_number=source_case_number,
|
||
source_document_id=UUID(doc["id"]),
|
||
)
|
||
total_candidates += len(cands)
|
||
for p in plans:
|
||
seen_numbers[p["plan_number"]] = p
|
||
by_doc.append({
|
||
"document_id": doc["id"], "title": doc.get("title", ""),
|
||
"status": "completed", "candidates": len(cands),
|
||
})
|
||
|
||
# Surface near-duplicates for the chair to merge manually (G10) — never
|
||
# auto-merged. A variant of an existing plan written differently won't share
|
||
# the normalized key, so flag it here instead of silently creating a dup.
|
||
plans_out = list(seen_numbers.values())
|
||
dup_hits = 0
|
||
for p in plans_out:
|
||
sims = await db.find_similar_plans(
|
||
p["plan_number"], p.get("display_name", ""), exclude_id=UUID(p["id"]),
|
||
)
|
||
p["possible_duplicates"] = sims
|
||
dup_hits += len(sims)
|
||
|
||
return {
|
||
"status": "completed",
|
||
"case_number": source_case_number,
|
||
"documents_scanned": len(by_doc),
|
||
"total_candidates": total_candidates,
|
||
"distinct_plans": len(plans_out),
|
||
"possible_duplicate_hits": dup_hits,
|
||
"plans": plans_out,
|
||
"by_document": by_doc,
|
||
}
|