Files
legal-ai/mcp-server/src/legal_mcp/services/references_extractor.py
Chaim 38a61712bc Fix plan regex: require numeric identifier after תב"ע
Previously matched any word after תב"ע (e.g., "תב"ע ואין", "תב"ע קיפחה").
Now requires a plan number (digits/hyphens) — reduces false positives from 24 to 4
on the Hecht case test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 10:50:56 +00:00

201 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""זיהוי תכניות, פסיקה וחקיקה במסמכים משפטיים.
שלוש קטגוריות:
1. תכניות (תב"ע, תמ"א, תכניות מקומיות/מחוזיות)
2. פסיקה (עע"מ, בג"ץ, ע"א, עררים)
3. חקיקה (חוק התכנון, חוק מיסוי מקרקעין, וכו')
"""
from __future__ import annotations
import json
import logging
import re
from uuid import UUID
from legal_mcp.services import db
logger = logging.getLogger(__name__)
# ── Regex patterns ────────────────────────────────────────────────────
# Plans (תכניות)
PLAN_PATTERNS = [
# תמ"א with number
re.compile(r'תמ"א\s*[\-]?\s*(\d+)(?:\s*[\-/]\s*(\S+))?'),
# תכנית מתאר with identifiers (must have a number)
re.compile(r'תכנית\s+(?:מתאר\s+)?(?:ארצית|מחוזית|מקומית)?\s*(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
# תב"ע with plan number (must start with digit or contain hyphen+digits)
re.compile(r'תב"ע\s+(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
# Plan number pattern (e.g., 102-1170893, 62/3)
re.compile(r'תכנית\s+(?:מס[\'"]?\s*)?(\d[\d/\-\.]+\S*)'),
]
# Case law (פסיקה)
CASE_LAW_PATTERNS = [
# Court types: עע"מ, בג"ץ, ע"א, בר"ם, עת"מ, עמ"נ, ע"ע, רע"א, דנ"א
re.compile(r'(עע"מ|בג"ץ|ע"א|בר"ם|עת"מ|עמ"נ|ע"ע|רע"א|דנ"א|בש"א)\s*(\d[\d/\-]+)'),
# ערר with district
re.compile(r'ערר\s*\(?\s*(מרכז|ירושלים|חי\'?פה?|ת"א|תל[- ]אביב|דרום|צפון)\s*\)?\s*(\d[\d/\-]+)'),
# ערר without district
re.compile(r'ערר\s+(\d{3,5}[\-/]\d{2,4})'),
]
# Legislation (חקיקה)
LEGISLATION_PATTERNS = [
re.compile(r'(חוק\s+התכנון\s+והבני[יה]ה)\s*[,،]?\s*(?:תשכ"ה[- ])?(?:1965)?(?:\s*[,،]\s*ס(?:עיף|\'\')\s*(\d+\S*))?'),
re.compile(r'(חוק\s+מיסוי\s+מקרקעין)\s*(?:\(שבח\s+ורכישה\))?\s*[,،]?\s*(?:תשכ"ג[- ])?(?:1963)?(?:\s*[,،]\s*ס(?:עיף|\'\')\s*(\d+\S*))?'),
re.compile(r'(תקנות\s+התכנון\s+והבני[יה]ה)\s*\(([^)]+)\)'),
re.compile(r'(חוק\s+ההתיישנות)\s*(?:תשי"ח[- ])?(?:1958)?'),
re.compile(r'ס(?:עיף|\'\')\s*(\d+\S*)\s*(?:ל|של)?(חוק\s+\S+(?:\s+\S+){0,3})'),
]
def extract_plans(text: str) -> list[dict]:
"""זיהוי תכניות במסמך."""
plans = []
seen = set()
for pattern in PLAN_PATTERNS:
for match in pattern.finditer(text):
full = match.group(0).strip()
if full in seen or len(full) < 4:
continue
seen.add(full)
start = max(0, match.start() - 60)
end = min(len(text), match.end() + 100)
context = text[start:end].replace("\n", " ").strip()
plans.append({
"plan_name": full,
"context": context,
})
return plans
def extract_case_law(text: str) -> list[dict]:
"""זיהוי פסיקה מצוטטת במסמך."""
citations = []
seen = set()
for pattern in CASE_LAW_PATTERNS:
for match in pattern.finditer(text):
full = match.group(0).strip()
if full in seen:
continue
seen.add(full)
start = max(0, match.start() - 50)
end = min(len(text), match.end() + 100)
context = text[start:end].replace("\n", " ").strip()
# Try to extract case name from context
case_name = ""
name_match = re.search(r'(?:עניין|פרשת|נ[\'"]?\s+)\s*(\S+(?:\s+\S+)?)', context)
if name_match:
case_name = name_match.group(1).strip()
citations.append({
"citation_text": full,
"case_name": case_name,
"context": context,
})
return citations
def extract_legislation(text: str) -> list[dict]:
"""זיהוי חקיקה מצוטטת במסמך."""
legislation = []
seen = set()
for pattern in LEGISLATION_PATTERNS:
for match in pattern.finditer(text):
full = match.group(0).strip()
if full in seen or len(full) < 5:
continue
seen.add(full)
start = max(0, match.start() - 40)
end = min(len(text), match.end() + 80)
context = text[start:end].replace("\n", " ").strip()
legislation.append({
"statute_text": full,
"context": context,
})
return legislation
def extract_all_references(text: str) -> dict:
"""זיהוי כל ההפניות במסמך: תכניות, פסיקה, חקיקה."""
return {
"plans": extract_plans(text),
"case_law": extract_case_law(text),
"legislation": extract_legislation(text),
}
async def extract_and_link_references(
document_id: UUID,
case_id: UUID,
text: str,
) -> dict:
"""זיהוי הפניות ושמירה ב-DB.
מזהה פסיקה וחקיקה, ומנסה לקשר לרשומות קיימות ב-case_law.
"""
refs = extract_all_references(text)
# Try to match case_law citations to existing DB records
pool = await db.get_pool()
linked = 0
async with pool.acquire() as conn:
# Get existing case_law for matching
case_laws = await conn.fetch("SELECT id, case_number, case_name FROM case_law")
case_law_map = {}
for cl in case_laws:
case_law_map[cl["case_number"]] = cl["id"]
parts = cl["case_number"].split()
if len(parts) > 1:
case_law_map[parts[-1]] = cl["id"]
for cit in refs["case_law"]:
case_law_id = None
for key, cl_id in case_law_map.items():
if key in cit["citation_text"] or cit["citation_text"] in key:
case_law_id = cl_id
break
cit["matched_in_db"] = case_law_id is not None
if case_law_id:
linked += 1
# Store references in document metadata
doc = await db.get_document(document_id)
if doc:
existing_metadata = doc.get("metadata") or {}
if isinstance(existing_metadata, str):
existing_metadata = json.loads(existing_metadata)
existing_metadata["references"] = {
"plans": [{"plan_name": p["plan_name"]} for p in refs["plans"]],
"case_law": [
{"citation": c["citation_text"], "case_name": c.get("case_name", ""), "in_db": c.get("matched_in_db", False)}
for c in refs["case_law"]
],
"legislation": [{"statute": l["statute_text"]} for l in refs["legislation"]],
}
await db.update_document(document_id, metadata=existing_metadata)
return {
"plans": len(refs["plans"]),
"case_law": len(refs["case_law"]),
"case_law_linked": linked,
"legislation": len(refs["legislation"]),
"details": refs,
}