Previously matched any word after תב"ע (e.g., "תב"ע ואין", "תב"ע קיפחה"). Now requires a plan number (digits/hyphens) — reduces false positives from 24 to 4 on the Hecht case test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
201 lines
7.0 KiB
Python
201 lines
7.0 KiB
Python
"""זיהוי תכניות, פסיקה וחקיקה במסמכים משפטיים.
|
||
|
||
שלוש קטגוריות:
|
||
1. תכניות (תב"ע, תמ"א, תכניות מקומיות/מחוזיות)
|
||
2. פסיקה (עע"מ, בג"ץ, ע"א, עררים)
|
||
3. חקיקה (חוק התכנון, חוק מיסוי מקרקעין, וכו')
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from uuid import UUID
|
||
|
||
from legal_mcp.services import db
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Regex patterns ────────────────────────────────────────────────────
|
||
|
||
# Plans (תכניות)
|
||
PLAN_PATTERNS = [
|
||
# תמ"א with number
|
||
re.compile(r'תמ"א\s*[\-]?\s*(\d+)(?:\s*[\-/]\s*(\S+))?'),
|
||
# תכנית מתאר with identifiers (must have a number)
|
||
re.compile(r'תכנית\s+(?:מתאר\s+)?(?:ארצית|מחוזית|מקומית)?\s*(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
|
||
# תב"ע with plan number (must start with digit or contain hyphen+digits)
|
||
re.compile(r'תב"ע\s+(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
|
||
# Plan number pattern (e.g., 102-1170893, 62/3)
|
||
re.compile(r'תכנית\s+(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
|
||
]
|
||
|
||
# Case law (פסיקה)
|
||
CASE_LAW_PATTERNS = [
|
||
# Court types: עע"מ, בג"ץ, ע"א, בר"ם, עת"מ, עמ"נ, ע"ע, רע"א, דנ"א
|
||
re.compile(r'(עע"מ|בג"ץ|ע"א|בר"ם|עת"מ|עמ"נ|ע"ע|רע"א|דנ"א|בש"א)\s*(\d[\d/\-]+)'),
|
||
# ערר with district
|
||
re.compile(r'ערר\s*\(?\s*(מרכז|ירושלים|חי\'?פה?|ת"א|תל[- ]אביב|דרום|צפון)\s*\)?\s*(\d[\d/\-]+)'),
|
||
# ערר without district
|
||
re.compile(r'ערר\s+(\d{3,5}[\-/]\d{2,4})'),
|
||
]
|
||
|
||
# Legislation (חקיקה)
|
||
LEGISLATION_PATTERNS = [
|
||
re.compile(r'(חוק\s+התכנון\s+והבני[יה]ה)\s*[,،]?\s*(?:תשכ"ה[- ])?(?:1965)?(?:\s*[,،]\s*ס(?:עיף|\'|ע\')\s*(\d+\S*))?'),
|
||
re.compile(r'(חוק\s+מיסוי\s+מקרקעין)\s*(?:\(שבח\s+ורכישה\))?\s*[,،]?\s*(?:תשכ"ג[- ])?(?:1963)?(?:\s*[,،]\s*ס(?:עיף|\'|ע\')\s*(\d+\S*))?'),
|
||
re.compile(r'(תקנות\s+התכנון\s+והבני[יה]ה)\s*\(([^)]+)\)'),
|
||
re.compile(r'(חוק\s+ההתיישנות)\s*(?:תשי"ח[- ])?(?:1958)?'),
|
||
re.compile(r'ס(?:עיף|\'|ע\')\s*(\d+\S*)\s*(?:ל|של)?(חוק\s+\S+(?:\s+\S+){0,3})'),
|
||
]
|
||
|
||
|
||
def extract_plans(text: str) -> list[dict]:
|
||
"""זיהוי תכניות במסמך."""
|
||
plans = []
|
||
seen = set()
|
||
|
||
for pattern in PLAN_PATTERNS:
|
||
for match in pattern.finditer(text):
|
||
full = match.group(0).strip()
|
||
if full in seen or len(full) < 4:
|
||
continue
|
||
seen.add(full)
|
||
|
||
start = max(0, match.start() - 60)
|
||
end = min(len(text), match.end() + 100)
|
||
context = text[start:end].replace("\n", " ").strip()
|
||
|
||
plans.append({
|
||
"plan_name": full,
|
||
"context": context,
|
||
})
|
||
|
||
return plans
|
||
|
||
|
||
def extract_case_law(text: str) -> list[dict]:
|
||
"""זיהוי פסיקה מצוטטת במסמך."""
|
||
citations = []
|
||
seen = set()
|
||
|
||
for pattern in CASE_LAW_PATTERNS:
|
||
for match in pattern.finditer(text):
|
||
full = match.group(0).strip()
|
||
if full in seen:
|
||
continue
|
||
seen.add(full)
|
||
|
||
start = max(0, match.start() - 50)
|
||
end = min(len(text), match.end() + 100)
|
||
context = text[start:end].replace("\n", " ").strip()
|
||
|
||
# Try to extract case name from context
|
||
case_name = ""
|
||
name_match = re.search(r'(?:עניין|פרשת|נ[\'"]?\s+)\s*(\S+(?:\s+\S+)?)', context)
|
||
if name_match:
|
||
case_name = name_match.group(1).strip()
|
||
|
||
citations.append({
|
||
"citation_text": full,
|
||
"case_name": case_name,
|
||
"context": context,
|
||
})
|
||
|
||
return citations
|
||
|
||
|
||
def extract_legislation(text: str) -> list[dict]:
|
||
"""זיהוי חקיקה מצוטטת במסמך."""
|
||
legislation = []
|
||
seen = set()
|
||
|
||
for pattern in LEGISLATION_PATTERNS:
|
||
for match in pattern.finditer(text):
|
||
full = match.group(0).strip()
|
||
if full in seen or len(full) < 5:
|
||
continue
|
||
seen.add(full)
|
||
|
||
start = max(0, match.start() - 40)
|
||
end = min(len(text), match.end() + 80)
|
||
context = text[start:end].replace("\n", " ").strip()
|
||
|
||
legislation.append({
|
||
"statute_text": full,
|
||
"context": context,
|
||
})
|
||
|
||
return legislation
|
||
|
||
|
||
def extract_all_references(text: str) -> dict:
|
||
"""זיהוי כל ההפניות במסמך: תכניות, פסיקה, חקיקה."""
|
||
return {
|
||
"plans": extract_plans(text),
|
||
"case_law": extract_case_law(text),
|
||
"legislation": extract_legislation(text),
|
||
}
|
||
|
||
|
||
async def extract_and_link_references(
|
||
document_id: UUID,
|
||
case_id: UUID,
|
||
text: str,
|
||
) -> dict:
|
||
"""זיהוי הפניות ושמירה ב-DB.
|
||
|
||
מזהה פסיקה וחקיקה, ומנסה לקשר לרשומות קיימות ב-case_law.
|
||
"""
|
||
refs = extract_all_references(text)
|
||
|
||
# Try to match case_law citations to existing DB records
|
||
pool = await db.get_pool()
|
||
linked = 0
|
||
|
||
async with pool.acquire() as conn:
|
||
# Get existing case_law for matching
|
||
case_laws = await conn.fetch("SELECT id, case_number, case_name FROM case_law")
|
||
case_law_map = {}
|
||
for cl in case_laws:
|
||
case_law_map[cl["case_number"]] = cl["id"]
|
||
parts = cl["case_number"].split()
|
||
if len(parts) > 1:
|
||
case_law_map[parts[-1]] = cl["id"]
|
||
|
||
for cit in refs["case_law"]:
|
||
case_law_id = None
|
||
for key, cl_id in case_law_map.items():
|
||
if key in cit["citation_text"] or cit["citation_text"] in key:
|
||
case_law_id = cl_id
|
||
break
|
||
|
||
cit["matched_in_db"] = case_law_id is not None
|
||
if case_law_id:
|
||
linked += 1
|
||
|
||
# Store references in document metadata
|
||
doc = await db.get_document(document_id)
|
||
if doc:
|
||
existing_metadata = doc.get("metadata") or {}
|
||
if isinstance(existing_metadata, str):
|
||
existing_metadata = json.loads(existing_metadata)
|
||
existing_metadata["references"] = {
|
||
"plans": [{"plan_name": p["plan_name"]} for p in refs["plans"]],
|
||
"case_law": [
|
||
{"citation": c["citation_text"], "case_name": c.get("case_name", ""), "in_db": c.get("matched_in_db", False)}
|
||
for c in refs["case_law"]
|
||
],
|
||
"legislation": [{"statute": l["statute_text"]} for l in refs["legislation"]],
|
||
}
|
||
await db.update_document(document_id, metadata=existing_metadata)
|
||
|
||
return {
|
||
"plans": len(refs["plans"]),
|
||
"case_law": len(refs["case_law"]),
|
||
"case_law_linked": linked,
|
||
"legislation": len(refs["legislation"]),
|
||
"details": refs,
|
||
}
|