"""חילוץ מובנה של תכניות בניין-עיר ותוקפן לתוך מרשם-התכניות (טבלת plans). תכלית: לבנות SSOT קנוני לתכניות שחוזרות בין תיקים — מספר-תכנית מנורמל, תוקף (פרסום למתן תוקף ברשומות + מס' ילקוט-הפרסומים), ומשפט-ייעוד אחד — כדי שבלוק ט יצטט אותן בנוסח אחיד ודטרמיניסטי (format_plan_citation) במקום לגזור מחדש מהשומות בכל תיק (G2). חילוץ עובדתי בלבד. הרשומות נכנסות review_status='pending_review' וממתינות לאישור-יו"ר (INV-DM5/G10) לפני שישמשו בכתיבה. הקריאות ל-LLM מתבצעות דרך claude_session המקומי בלבד (כמו שאר המחלצים) — לא Anthropic SDK ישיר. """ from __future__ import annotations import logging from uuid import UUID from legal_mcp.services import claude_session, db logger = logging.getLogger(__name__) # Descriptive provenance tag for INV-DM4 (we call the local claude CLI session, # not a pinned model id — the session model is whatever is configured). MODEL_TAG = "claude_local" EXTRACT_PLANS_PROMPT = """אתה מחלץ מידע עובדתי על תכניות בניין-עיר (תב"ע) עבור מרשם-תכניות של ועדת ערר. תפקידך: לחלץ כל תכנית שמצוין לגביה **תוקף** — מתי פורסמה למתן תוקף (ברשומות / בילקוט הפרסומים) — או ייעוד ברור. ## כללים - עובדתי בלבד. אל תסיק, אל תפרש, ואל תמציא תאריך שאינו כתוב במפורש. - חלץ רק תכניות שמופיע לגביהן מידע-תוקף או ייעוד ברור. דלג על אזכור-אגב ללא פרטים. - gazette_date: תאריך הפרסום למתן תוקף, בפורמט ISO (YYYY-MM-DD). אם לא צוין תאריך — השאר "". - yalkut_number: מספר ילקוט הפרסומים / י"פ אם צוין (למשל "5965"). אחרת "". - display_name: שם-התכנית כפי שמקובל לכתוב בהחלטה, כולל המילה "תכנית" (למשל "תכנית מי/820"). - plan_number: מזהה-התכנית בלבד, ללא המילה "תכנית" (למשל "מי/820", "5166/ב", "152-0132902", "תמ\\"א 38"). - plan_type: אחד מ- ארצית / מחוזית / מקומית / מפורטת / כוללנית, אם ניתן לקבוע מהטקסט. אחרת "". - purpose: משפט-ייעוד אחד תמציתי (מה התכנית עושה/משנה/קובעת). אחרת "". - raw_quote: ציטוט מילולי של המשפט שממנו חולץ התוקף, עד 200 תווים. ## פלט החזר JSON array בלבד — ללא markdown, ללא הסברים: [ { "plan_number": "מי/820", "display_name": "תכנית מי/820", "plan_type": "מקומית", "gazette_date": "2001-08-09", "yalkut_number": "", "purpose": "משנה את הוראות תכנית מי/200 ומרחיבה את השימושים המותרים באזור חקלאי", "raw_quote": "תוכנית מי/820 ... פורסמה למתן תוקף ביום 9.8.2001" } ] אם אין תכניות עם מידע-תוקף/ייעוד — החזר []. """ def _chunk_text(text: str, max_chars: int = 25000) -> list[str]: """Split a long document at paragraph boundaries (mirrors appraiser extractor).""" if len(text) <= max_chars: return [text] chunks: list[str] = [] pos = 0 while pos < len(text): end = min(pos + max_chars, len(text)) if end < len(text): break_pos = text.rfind("\n\n", pos, end) if break_pos > pos + max_chars // 2: end = break_pos chunks.append(text[pos:end]) pos = end return chunks async def extract_plans_from_text(text: str) -> list[dict]: """Extract plan candidates from arbitrary text via the local claude session. Returns a list of normalized candidate dicts (not yet persisted). Factual only. """ candidates: list[dict] = [] chunks = _chunk_text(text) for i, chunk in enumerate(chunks): chunk_label = f" (חלק {i+1}/{len(chunks)})" if len(chunks) > 1 else "" prompt = ( f"{EXTRACT_PLANS_PROMPT}\n\n" f"--- תחילת מסמך{chunk_label} ---\n{chunk}\n--- סוף מסמך ---" ) result = await claude_session.query_json(prompt, tools="") # no tool_use if not isinstance(result, list): logger.warning( "extract_plans_from_text: chunk %d returned non-list (%s)", i, type(result).__name__, ) continue for item in result: if not isinstance(item, dict): continue num = (item.get("plan_number") or "").strip() if not num: continue candidates.append({ "plan_number": num, "display_name": (item.get("display_name") or "").strip(), "plan_type": (item.get("plan_type") or "").strip(), "gazette_date": (item.get("gazette_date") or "").strip(), "yalkut_number": (item.get("yalkut_number") or "").strip(), "purpose": (item.get("purpose") or "").strip(), "raw_quote": (item.get("raw_quote") or "").strip(), }) return candidates async def upsert_candidates( candidates: list[dict], *, source_case_number: str = "", source_document_id: UUID | None = None, model_used: str = MODEL_TAG, ) -> list[dict]: """Upsert extracted candidates into the registry as pending_review (G10).""" out: list[dict] = [] for c in candidates: try: plan = await db.upsert_plan( plan_number=c["plan_number"], display_name=c.get("display_name", ""), plan_type=c.get("plan_type", ""), gazette_date=c.get("gazette_date") or None, yalkut_number=c.get("yalkut_number", ""), purpose=c.get("purpose", ""), review_status="pending_review", source_case_number=source_case_number, source_document_id=source_document_id, model_used=model_used, ) out.append(plan) except ValueError as e: # Don't swallow — surface the bad candidate so it isn't silently dropped. logger.warning("upsert_candidates: skipped %r — %s", c.get("plan_number"), e) return out async def extract_plans_for_case(case_id: UUID) -> dict: """Extract plan candidates from every document with text in the case. Upserts them into the registry as pending_review. Thorough by design (we do not pre-filter by doc_type — a plan's validity can be cited anywhere). Returns a summary for serialization back to the caller. """ case = await db.get_case(case_id) source_case_number = (case or {}).get("case_number", "") or "" docs = await db.list_documents(case_id) by_doc: list[dict] = [] seen_numbers: dict[str, dict] = {} total_candidates = 0 for doc in docs: text = await db.get_document_text(UUID(doc["id"])) if not text: continue try: cands = await extract_plans_from_text(text) except Exception as e: # noqa: BLE001 — record, don't swallow logger.exception("extract_plans_for_case: failed on doc %s", doc["id"]) by_doc.append({ "document_id": doc["id"], "title": doc.get("title", ""), "status": "error", "error": str(e), "candidates": 0, }) continue plans = await upsert_candidates( cands, source_case_number=source_case_number, source_document_id=UUID(doc["id"]), ) total_candidates += len(cands) for p in plans: seen_numbers[p["plan_number"]] = p by_doc.append({ "document_id": doc["id"], "title": doc.get("title", ""), "status": "completed", "candidates": len(cands), }) # Surface near-duplicates for the chair to merge manually (G10) — never # auto-merged. A variant of an existing plan written differently won't share # the normalized key, so flag it here instead of silently creating a dup. plans_out = list(seen_numbers.values()) dup_hits = 0 for p in plans_out: sims = await db.find_similar_plans( p["plan_number"], p.get("display_name", ""), exclude_id=UUID(p["id"]), ) p["possible_duplicates"] = sims dup_hits += len(sims) return { "status": "completed", "case_number": source_case_number, "documents_scanned": len(by_doc), "total_candidates": total_candidates, "distinct_plans": len(plans_out), "possible_duplicate_hits": dup_hits, "plans": plans_out, "by_document": by_doc, }