Remove din-leumi: fully separate into standalone service

- Removed din-leumi imports, endpoints, and processing from app.py
- Removed bundled din-leumi source from repo
- Simplified Dockerfile (no din-leumi dependency)
- din-leumi now runs as its own Coolify application

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-04 08:34:35 +00:00
parent 324807ff1d
commit cb41867bc9
16 changed files with 3 additions and 1549 deletions

View File

@@ -11,19 +11,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
COPY mcp-server/pyproject.toml /app/mcp-server/pyproject.toml
COPY mcp-server/src/ /app/mcp-server/src/
# Copy Din Leumi MCP server source
COPY din-leumi/ /app/din-leumi/
# Install both MCP servers + web deps
RUN pip install --no-cache-dir /app/mcp-server /app/din-leumi && \
# Install MCP server + web deps
RUN pip install --no-cache-dir /app/mcp-server && \
pip install --no-cache-dir fastapi uvicorn python-multipart
# Copy web app
COPY web/ /app/web/
ENV PYTHONPATH=/app/mcp-server/src:/app/din-leumi/src
ENV PYTHONPATH=/app/mcp-server/src
ENV DOTENV_PATH=/dev/null
ENV DOTENV_PATH=/home/chaim/.env
EXPOSE 8080

View File

@@ -1,25 +0,0 @@
[project]
name = "din-leumi"
version = "0.1.0"
description = "MCP server for cataloging and searching National Insurance court decisions"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.0.0",
"asyncpg>=0.29.0",
"pgvector>=0.3.0",
"voyageai>=0.3.0",
"anthropic>=0.40.0",
"python-dotenv>=1.0.0",
"pydantic>=2.0.0",
"pymupdf>=1.25.0",
"python-docx>=1.1.0",
"striprtf>=0.0.26",
"pillow>=10.0.0",
]
[build-system]
requires = ["setuptools>=68.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -1,4 +0,0 @@
"""Allow running with: python -m din_leumi"""
from din_leumi.server import main
main()

View File

@@ -1,36 +0,0 @@
"""Configuration loaded from central .env file."""
import os
from pathlib import Path
from dotenv import load_dotenv
# Load from central .env or override path
dotenv_path = os.environ.get("DOTENV_PATH", str(Path.home() / ".env"))
load_dotenv(dotenv_path)
# PostgreSQL - uses shared server but separate database
POSTGRES_URL = os.environ.get(
"DIN_LEUMI_POSTGRES_URL",
f"postgres://{os.environ.get('POSTGRES_USER', 'legal_ai')}:"
f"{os.environ.get('POSTGRES_PASSWORD', '')}@"
f"{os.environ.get('POSTGRES_HOST', '127.0.0.1')}:"
f"{os.environ.get('POSTGRES_PORT', '5433')}/"
f"{os.environ.get('DIN_LEUMI_POSTGRES_DB', 'din_leumi')}",
)
# Voyage AI
VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY", "")
VOYAGE_MODEL = "voyage-3-large"
VOYAGE_DIMENSIONS = 1024
# Anthropic (for Claude Vision OCR)
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
# Data directory
DATA_DIR = Path(os.environ.get("DIN_LEUMI_DATA_DIR", str(Path.home() / "din-leumi" / "data")))
DECISIONS_DIR = DATA_DIR / "decisions"
# Chunking parameters
CHUNK_SIZE_TOKENS = 600
CHUNK_OVERLAP_TOKENS = 100

View File

@@ -1,156 +0,0 @@
"""Din Leumi - MCP Server entry point.
Run with: python -m din_leumi.server
"""
from __future__ import annotations
import logging
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
# Configure logging to stderr (stdout is reserved for JSON-RPC)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger("din_leumi")
@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[None]:
"""Initialize DB schema on startup, close pool on shutdown."""
from din_leumi.services.db import close_pool, init_schema
logger.info("Initializing database schema...")
await init_schema()
logger.info("Din Leumi MCP server ready")
try:
yield
finally:
await close_pool()
logger.info("Din Leumi MCP server stopped")
# Create MCP server
mcp = FastMCP(
"Din Leumi - דין לאומי",
instructions="מערכת לקטלוג וחיפוש סמנטי של פסקי דין בתחום ביטוח לאומי",
lifespan=lifespan,
)
# ── Import tool modules ────────────────────────────────────────────
from din_leumi.tools import decisions, search # noqa: E402
# ── Decision management ────────────────────────────────────────────
@mcp.tool()
async def decision_upload(
file_path: str,
title: str = "",
court: str = "",
decision_date: str = "",
case_number: str = "",
judge: str = "",
parties_appellant: str = "",
parties_respondent: str = "המוסד לביטוח לאומי",
topics: list[str] | None = None,
outcome: str = "",
) -> str:
"""העלאה ועיבוד פסק דין של בית דין לעבודה בתחום ביטוח לאומי (PDF/DOCX/RTF/TXT).
מחלץ טקסט, יוצר chunks ו-embeddings לחיפוש סמנטי.
outcome: accepted/rejected/partial/remanded."""
return await decisions.decision_upload(
file_path, title, court, decision_date, case_number,
judge, parties_appellant, parties_respondent, topics, outcome,
)
@mcp.tool()
async def decision_search(
query: str,
limit: int = 10,
court: str = "",
topic: str = "",
date_from: str = "",
date_to: str = "",
outcome: str = "",
) -> str:
"""חיפוש סמנטי בפסקי דין של ביטוח לאומי.
ניתן לסנן לפי בית משפט, נושא, טווח תאריכים ותוצאה.
נושאים נפוצים: נכות כללית, נכות מעבודה, דמי לידה, תאונת עבודה, גמלת הבטחת הכנסה, דמי אבטלה."""
return await search.decision_search(
query, limit, court, topic, date_from, date_to, outcome,
)
@mcp.tool()
async def decision_list(
court: str = "",
topic: str = "",
judge: str = "",
date_from: str = "",
date_to: str = "",
outcome: str = "",
limit: int = 50,
) -> str:
"""רשימת פסקי דין לפי מטאדאטא. סינון אופציונלי לפי בית משפט, נושא, שופט, תאריכים, תוצאה."""
return await decisions.decision_list(
court, topic, judge, date_from, date_to, outcome, limit,
)
@mcp.tool()
async def decision_get(
decision_id: str = "",
case_number: str = "",
) -> str:
"""פרטי פסק דין מלאים כולל טקסט מחולץ. חיפוש לפי decision_id או case_number."""
return await decisions.decision_get(decision_id, case_number)
@mcp.tool()
async def decision_update(
decision_id: str,
title: str = "",
court: str = "",
decision_date: str = "",
case_number: str = "",
judge: str = "",
parties_appellant: str = "",
parties_respondent: str = "",
topics: list[str] | None = None,
outcome: str = "",
summary: str = "",
) -> str:
"""עדכון מטאדאטא של פסק דין."""
return await decisions.decision_update(
decision_id, title, court, decision_date, case_number,
judge, parties_appellant, parties_respondent, topics, outcome, summary,
)
@mcp.tool()
async def decision_delete(decision_id: str) -> str:
"""מחיקת פסק דין וכל ה-chunks שלו מהמערכת."""
return await decisions.decision_delete(decision_id)
@mcp.tool()
async def system_status() -> str:
"""סטטוס מערכת דין לאומי - מספר פסקי דין, chunks, סטטיסטיקות."""
return await search.system_status()
def main():
mcp.run(transport="stdio")
if __name__ == "__main__":
main()

View File

@@ -1,132 +0,0 @@
"""Legal document chunker - splits text into sections and chunks for RAG.
Adapted for National Insurance (Bituach Leumi) court decisions.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from din_leumi import config
# Hebrew legal section headers for NI court decisions
SECTION_PATTERNS = [
(r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע|עובדות\s*המקרה", "facts"),
(r"טענות\s*התובע[ת]?|טענות\s*המבוטח[ת]?|טענות\s*המערער[ת]?|עיקר\s*טענות\s*התובע", "claimant_claims"),
(r"טענות\s*הנתבע[ת]?|טענות\s*המוסד|עיקר\s*טענות\s*הנתבע|תשובת\s*המוסד", "respondent_claims"),
(r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|הכרעת\s*הדין", "legal_analysis"),
(r"מסקנ[הות]|סיכום", "conclusion"),
(r"סוף\s*דבר|לסיכום|התוצאה|אשר\s*על\s*כן", "ruling"),
(r"מבוא|פתיחה|לפניי|לפני[נו]?", "intro"),
(r"הדין\s*החל|המסגרת\s*הנורמטיבית|הוראות\s*החוק", "legal_framework"),
]
@dataclass
class Chunk:
content: str
section_type: str = "other"
page_number: int | None = None
chunk_index: int = 0
def chunk_document(
text: str,
chunk_size: int = config.CHUNK_SIZE_TOKENS,
overlap: int = config.CHUNK_OVERLAP_TOKENS,
) -> list[Chunk]:
"""Split a legal document into chunks, respecting section boundaries."""
if not text.strip():
return []
sections = _split_into_sections(text)
chunks: list[Chunk] = []
idx = 0
for section_type, section_text in sections:
section_chunks = _split_section(section_text, chunk_size, overlap)
for chunk_text in section_chunks:
chunks.append(Chunk(
content=chunk_text,
section_type=section_type,
chunk_index=idx,
))
idx += 1
return chunks
def _split_into_sections(text: str) -> list[tuple[str, str]]:
"""Split text into (section_type, text) pairs based on Hebrew headers."""
markers: list[tuple[int, str]] = []
for pattern, section_type in SECTION_PATTERNS:
for match in re.finditer(pattern, text):
markers.append((match.start(), section_type))
if not markers:
return [("other", text)]
markers.sort(key=lambda x: x[0])
sections: list[tuple[str, str]] = []
# Text before first section
if markers[0][0] > 0:
intro_text = text[: markers[0][0]].strip()
if intro_text:
sections.append(("intro", intro_text))
# Each section
for i, (pos, section_type) in enumerate(markers):
end = markers[i + 1][0] if i + 1 < len(markers) else len(text)
section_text = text[pos:end].strip()
if section_text:
sections.append((section_type, section_text))
return sections
def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]:
"""Split a section into overlapping chunks by paragraphs.
Uses approximate token counting (Hebrew ~1.5 chars per token).
"""
if not text.strip():
return []
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
chunks: list[str] = []
current: list[str] = []
current_tokens = 0
for para in paragraphs:
para_tokens = _estimate_tokens(para)
if current_tokens + para_tokens > chunk_size and current:
chunks.append("\n".join(current))
# Keep overlap
overlap_paras: list[str] = []
overlap_tokens = 0
for p in reversed(current):
pt = _estimate_tokens(p)
if overlap_tokens + pt > overlap:
break
overlap_paras.insert(0, p)
overlap_tokens += pt
current = overlap_paras
current_tokens = overlap_tokens
current.append(para)
current_tokens += para_tokens
if current:
chunks.append("\n".join(current))
return chunks
def _estimate_tokens(text: str) -> int:
"""Rough token estimate for Hebrew text (~1.5 chars per token)."""
return max(1, len(text) // 2)

View File

@@ -1,374 +0,0 @@
"""Database service - asyncpg connection pool and queries for din-leumi."""
from __future__ import annotations
import json
import logging
from datetime import date
from uuid import UUID, uuid4
import asyncpg
from pgvector.asyncpg import register_vector
from din_leumi import config
logger = logging.getLogger(__name__)
_pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
global _pool
if _pool is None:
conn = await asyncpg.connect(config.POSTGRES_URL)
await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"')
await conn.close()
_pool = await asyncpg.create_pool(
config.POSTGRES_URL,
min_size=2,
max_size=10,
init=_init_connection,
)
return _pool
async def _init_connection(conn: asyncpg.Connection) -> None:
await register_vector(conn)
async def close_pool() -> None:
global _pool
if _pool:
await _pool.close()
_pool = None
# ── Schema ──────────────────────────────────────────────────────────
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS decisions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
title TEXT NOT NULL,
file_path TEXT NOT NULL,
extracted_text TEXT DEFAULT '',
extraction_status TEXT DEFAULT 'pending',
court TEXT DEFAULT '',
decision_date DATE,
case_number TEXT DEFAULT '',
judge TEXT DEFAULT '',
parties_appellant TEXT DEFAULT '',
parties_respondent TEXT DEFAULT 'המוסד לביטוח לאומי',
topics JSONB DEFAULT '[]',
outcome TEXT DEFAULT '',
summary TEXT DEFAULT '',
page_count INTEGER,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS decision_chunks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
section_type TEXT DEFAULT 'other',
embedding vector(1024),
page_number INTEGER,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_chunks_decision ON decision_chunks(decision_id);
CREATE INDEX IF NOT EXISTS idx_decisions_court ON decisions(court);
CREATE INDEX IF NOT EXISTS idx_decisions_date ON decisions(decision_date);
CREATE INDEX IF NOT EXISTS idx_decisions_case_number ON decisions(case_number);
CREATE INDEX IF NOT EXISTS idx_decisions_topics ON decisions USING gin(topics);
"""
# IVFFlat index requires data to exist; create after first batch of decisions
IVFFLAT_INDEX_SQL = """
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON decision_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
"""
async def init_schema() -> None:
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
# Check if we have enough data for IVFFlat
count = await conn.fetchval("SELECT count(*) FROM decision_chunks")
if count and count > 100:
await conn.execute(IVFFLAT_INDEX_SQL)
logger.info("IVFFlat index created (%d chunks)", count)
logger.info("Database schema initialized")
async def ensure_ivfflat_index() -> None:
"""Create IVFFlat index if enough data exists."""
pool = await get_pool()
async with pool.acquire() as conn:
count = await conn.fetchval("SELECT count(*) FROM decision_chunks")
if count and count > 100:
await conn.execute(IVFFLAT_INDEX_SQL)
# ── Decision CRUD ───────────────────────────────────────────────────
async def create_decision(
title: str,
file_path: str,
court: str = "",
decision_date: date | None = None,
case_number: str = "",
judge: str = "",
parties_appellant: str = "",
parties_respondent: str = "המוסד לביטוח לאומי",
topics: list[str] | None = None,
outcome: str = "",
) -> dict:
pool = await get_pool()
decision_id = uuid4()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO decisions (id, title, file_path, court, decision_date,
case_number, judge, parties_appellant, parties_respondent,
topics, outcome)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
decision_id, title, file_path, court, decision_date,
case_number, judge, parties_appellant, parties_respondent,
json.dumps(topics or []), outcome,
)
return await get_decision(decision_id)
async def get_decision(decision_id: UUID) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id)
if row is None:
return None
return _row_to_decision(row)
async def get_decision_by_case_number(case_number: str) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM decisions WHERE case_number = $1", case_number
)
if row is None:
return None
return _row_to_decision(row)
async def list_decisions(
court: str = "",
topic: str = "",
judge: str = "",
date_from: date | None = None,
date_to: date | None = None,
outcome: str = "",
limit: int = 50,
) -> list[dict]:
pool = await get_pool()
conditions = []
params: list = []
idx = 1
if court:
conditions.append(f"court ILIKE ${idx}")
params.append(f"%{court}%")
idx += 1
if topic:
conditions.append(f"topics @> ${idx}::jsonb")
params.append(json.dumps([topic]))
idx += 1
if judge:
conditions.append(f"judge ILIKE ${idx}")
params.append(f"%{judge}%")
idx += 1
if date_from:
conditions.append(f"decision_date >= ${idx}")
params.append(date_from)
idx += 1
if date_to:
conditions.append(f"decision_date <= ${idx}")
params.append(date_to)
idx += 1
if outcome:
conditions.append(f"outcome = ${idx}")
params.append(outcome)
idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
sql = f"""
SELECT id, title, court, decision_date, case_number, judge,
parties_appellant, parties_respondent, topics, outcome, summary,
extraction_status, page_count, created_at
FROM decisions
{where}
ORDER BY decision_date DESC NULLS LAST, created_at DESC
LIMIT ${idx}
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [_row_to_decision(r) for r in rows]
async def update_decision(decision_id: UUID, **fields) -> dict | None:
if not fields:
return await get_decision(decision_id)
pool = await get_pool()
set_clauses = []
values = []
for i, (key, val) in enumerate(fields.items(), start=2):
if key == "topics":
val = json.dumps(val)
set_clauses.append(f"{key} = ${i}")
values.append(val)
set_clauses.append("updated_at = now()")
sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1"
async with pool.acquire() as conn:
await conn.execute(sql, decision_id, *values)
return await get_decision(decision_id)
async def delete_decision(decision_id: UUID) -> bool:
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM decisions WHERE id = $1", decision_id)
return result == "DELETE 1"
def _row_to_decision(row: asyncpg.Record) -> dict:
d = dict(row)
if isinstance(d.get("topics"), str):
d["topics"] = json.loads(d["topics"])
for field in ("id",):
if field in d and d[field] is not None:
d[field] = str(d[field])
return d
# ── Chunks & Vectors ───────────────────────────────────────────────
async def store_chunks(
decision_id: UUID,
chunks: list[dict],
) -> int:
"""Store decision chunks with embeddings."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM decision_chunks WHERE decision_id = $1", decision_id
)
for chunk in chunks:
await conn.execute(
"""INSERT INTO decision_chunks
(decision_id, chunk_index, content, section_type, embedding, page_number)
VALUES ($1, $2, $3, $4, $5, $6)""",
decision_id,
chunk["chunk_index"],
chunk["content"],
chunk.get("section_type", "other"),
chunk["embedding"],
chunk.get("page_number"),
)
return len(chunks)
async def search_similar(
query_embedding: list[float],
limit: int = 10,
court: str = "",
topic: str = "",
date_from: date | None = None,
date_to: date | None = None,
outcome: str = "",
) -> list[dict]:
"""Cosine similarity search on decision chunks with metadata filtering."""
pool = await get_pool()
conditions = []
params: list = [query_embedding, limit]
idx = 3
if court:
conditions.append(f"d.court ILIKE ${idx}")
params.append(f"%{court}%")
idx += 1
if topic:
conditions.append(f"d.topics @> ${idx}::jsonb")
params.append(json.dumps([topic]))
idx += 1
if date_from:
conditions.append(f"d.decision_date >= ${idx}")
params.append(date_from)
idx += 1
if date_to:
conditions.append(f"d.decision_date <= ${idx}")
params.append(date_to)
idx += 1
if outcome:
conditions.append(f"d.outcome = ${idx}")
params.append(outcome)
idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
SELECT dc.content, dc.section_type, dc.page_number,
dc.decision_id,
d.title, d.case_number, d.court, d.decision_date,
d.judge, d.outcome,
1 - (dc.embedding <=> $1) AS score
FROM decision_chunks dc
JOIN decisions d ON d.id = dc.decision_id
{where}
ORDER BY dc.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
results = []
for r in rows:
d = dict(r)
if d.get("decision_id"):
d["decision_id"] = str(d["decision_id"])
results.append(d)
return results
# ── Stats ──────────────────────────────────────────────────────────
async def get_stats() -> dict:
pool = await get_pool()
async with pool.acquire() as conn:
total_decisions = await conn.fetchval("SELECT count(*) FROM decisions")
total_chunks = await conn.fetchval("SELECT count(*) FROM decision_chunks")
completed = await conn.fetchval(
"SELECT count(*) FROM decisions WHERE extraction_status = 'completed'"
)
courts = await conn.fetch(
"SELECT court, count(*) as cnt FROM decisions WHERE court != '' GROUP BY court ORDER BY cnt DESC LIMIT 10"
)
date_range = await conn.fetchrow(
"SELECT min(decision_date) as earliest, max(decision_date) as latest FROM decisions WHERE decision_date IS NOT NULL"
)
return {
"total_decisions": total_decisions,
"completed_decisions": completed,
"total_chunks": total_chunks,
"courts": [{"court": r["court"], "count": r["cnt"]} for r in courts],
"date_range": {
"earliest": str(date_range["earliest"]) if date_range and date_range["earliest"] else None,
"latest": str(date_range["latest"]) if date_range and date_range["latest"] else None,
} if date_range else None,
}

View File

@@ -1,55 +0,0 @@
"""Embedding service using Voyage AI API."""
from __future__ import annotations
import logging
import voyageai
from din_leumi import config
logger = logging.getLogger(__name__)
_client: voyageai.Client | None = None
def _get_client() -> voyageai.Client:
global _client
if _client is None:
_client = voyageai.Client(api_key=config.VOYAGE_API_KEY)
return _client
async def embed_texts(texts: list[str], input_type: str = "document") -> list[list[float]]:
"""Embed a batch of texts using Voyage AI.
Args:
texts: List of texts to embed (max 128 per call).
input_type: "document" for indexing, "query" for search queries.
Returns:
List of embedding vectors (1024 dimensions each).
"""
if not texts:
return []
client = _get_client()
all_embeddings = []
# Voyage AI supports up to 128 texts per batch
for i in range(0, len(texts), 128):
batch = texts[i : i + 128]
result = client.embed(
batch,
model=config.VOYAGE_MODEL,
input_type=input_type,
)
all_embeddings.extend(result.embeddings)
return all_embeddings
async def embed_query(query: str) -> list[float]:
"""Embed a single search query."""
results = await embed_texts([query], input_type="query")
return results[0]

View File

@@ -1,126 +0,0 @@
"""Text extraction from PDF, DOCX, and RTF files.
Primary PDF extraction: Claude Vision API (for scanned documents).
Fallback: PyMuPDF direct text extraction (for born-digital PDFs).
"""
from __future__ import annotations
import base64
import logging
from pathlib import Path
import anthropic
import fitz # PyMuPDF
from docx import Document as DocxDocument
from striprtf.striprtf import rtf_to_text
from din_leumi import config
logger = logging.getLogger(__name__)
_anthropic_client: anthropic.Anthropic | None = None
def _get_anthropic() -> anthropic.Anthropic:
global _anthropic_client
if _anthropic_client is None:
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
return _anthropic_client
async def extract_text(file_path: str) -> tuple[str, int]:
"""Extract text from a document file.
Returns:
Tuple of (extracted_text, page_count).
page_count is 0 for non-PDF files.
"""
path = Path(file_path)
suffix = path.suffix.lower()
if suffix == ".pdf":
return await _extract_pdf(path)
elif suffix == ".docx":
return _extract_docx(path), 0
elif suffix == ".rtf":
return _extract_rtf(path), 0
elif suffix == ".txt":
return path.read_text(encoding="utf-8"), 0
else:
raise ValueError(f"Unsupported file type: {suffix}")
async def _extract_pdf(path: Path) -> tuple[str, int]:
"""Extract text from PDF. Try direct text first, fall back to Claude Vision for scanned pages."""
doc = fitz.open(str(path))
page_count = len(doc)
pages_text: list[str] = []
for page_num in range(page_count):
page = doc[page_num]
# Try direct text extraction first
text = page.get_text().strip()
if len(text) > 50:
# Sufficient text found - born-digital page
pages_text.append(text)
logger.debug("Page %d: direct text extraction (%d chars)", page_num + 1, len(text))
else:
# Likely scanned - use Claude Vision
logger.info("Page %d: using Claude Vision OCR", page_num + 1)
pix = page.get_pixmap(dpi=200)
img_bytes = pix.tobytes("png")
ocr_text = await _ocr_with_claude(img_bytes, page_num + 1)
pages_text.append(ocr_text)
doc.close()
return "\n\n".join(pages_text), page_count
async def _ocr_with_claude(image_bytes: bytes, page_num: int) -> str:
"""OCR a single page image using Claude Vision API."""
client = _get_anthropic()
b64_image = base64.b64encode(image_bytes).decode("utf-8")
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": b64_image,
},
},
{
"type": "text",
"text": (
"חלץ את כל הטקסט מהתמונה הזו. זהו מסמך משפטי בעברית. "
"שמור על מבנה הפסקאות המקורי. "
"החזר רק את הטקסט המחולץ, ללא הערות נוספות."
),
},
],
}
],
)
return message.content[0].text
def _extract_docx(path: Path) -> str:
"""Extract text from DOCX file."""
doc = DocxDocument(str(path))
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n\n".join(paragraphs)
def _extract_rtf(path: Path) -> str:
"""Extract text from RTF file."""
rtf_content = path.read_text(encoding="utf-8", errors="replace")
return rtf_to_text(rtf_content)

View File

@@ -1,82 +0,0 @@
"""Decision processing pipeline: extract → chunk → embed → store."""
from __future__ import annotations
import logging
from uuid import UUID
from din_leumi.services import chunker, db, embeddings, extractor
logger = logging.getLogger(__name__)
async def process_decision(decision_id: UUID) -> dict:
"""Full processing pipeline for a decision.
1. Extract text from file
2. Split into chunks
3. Generate embeddings
4. Store chunks + embeddings in DB
Returns processing summary.
"""
decision = await db.get_decision(decision_id)
if not decision:
raise ValueError(f"Decision {decision_id} not found")
await db.update_decision(decision_id, extraction_status="processing")
try:
# Step 1: Extract text
logger.info("Extracting text from %s", decision["file_path"])
text, page_count = await extractor.extract_text(decision["file_path"])
await db.update_decision(
decision_id,
extracted_text=text,
page_count=page_count,
)
# Step 2: Chunk
logger.info("Chunking decision (%d chars)", len(text))
chunks = chunker.chunk_document(text)
if not chunks:
await db.update_decision(decision_id, extraction_status="completed")
return {"status": "completed", "chunks": 0, "message": "No text to chunk"}
# Step 3: Embed
logger.info("Generating embeddings for %d chunks", len(chunks))
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
# Step 4: Store
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
stored = await db.store_chunks(decision_id, chunk_dicts)
await db.update_decision(decision_id, extraction_status="completed")
# Try to create IVFFlat index if we have enough data
await db.ensure_ivfflat_index()
logger.info("Decision processed: %d chunks stored", stored)
return {
"status": "completed",
"chunks": stored,
"pages": page_count,
"text_length": len(text),
}
except Exception as e:
logger.exception("Decision processing failed: %s", e)
await db.update_decision(decision_id, extraction_status="failed")
return {"status": "failed", "error": str(e)}

View File

@@ -1,241 +0,0 @@
"""Decision CRUD tool implementations."""
from __future__ import annotations
import json
import shutil
from datetime import date
from pathlib import Path
from uuid import UUID
from din_leumi import config
from din_leumi.services import db, processor
async def decision_upload(
file_path: str,
title: str = "",
court: str = "",
decision_date: str = "",
case_number: str = "",
judge: str = "",
parties_appellant: str = "",
parties_respondent: str = "המוסד לביטוח לאומי",
topics: list[str] | None = None,
outcome: str = "",
) -> str:
"""Upload and process a court decision."""
source = Path(file_path)
if not source.exists():
return f"❌ הקובץ לא נמצא: {file_path}"
ext = source.suffix.lower()
if ext not in (".pdf", ".docx", ".rtf", ".txt"):
return f"❌ סוג קובץ לא נתמך: {ext}"
# Copy to decisions directory
config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True)
dest = config.DECISIONS_DIR / source.name
if dest.exists():
# Add suffix to avoid overwrite
dest = config.DECISIONS_DIR / f"{source.stem}_{UUID.__class__.__name__[:8]}{ext}"
shutil.copy2(str(source), str(dest))
# Parse date
d_date = None
if decision_date:
try:
d_date = date.fromisoformat(decision_date)
except ValueError:
return f"❌ פורמט תאריך לא תקין: {decision_date}. נדרש YYYY-MM-DD"
# Create decision record
if not title:
title = source.stem
decision = await db.create_decision(
title=title,
file_path=str(dest),
court=court,
decision_date=d_date,
case_number=case_number,
judge=judge,
parties_appellant=parties_appellant,
parties_respondent=parties_respondent,
topics=topics,
outcome=outcome,
)
# Process (extract → chunk → embed → store)
result = await processor.process_decision(UUID(decision["id"]))
status_icon = "" if result["status"] == "completed" else ""
lines = [
f"{status_icon} פסק דין הועלה ועובד",
f" כותרת: {title}",
f" מזהה: {decision['id']}",
]
if case_number:
lines.append(f" מספר תיק: {case_number}")
if result.get("chunks"):
lines.append(f" chunks: {result['chunks']}")
if result.get("pages"):
lines.append(f" עמודים: {result['pages']}")
if result.get("error"):
lines.append(f" שגיאה: {result['error']}")
return "\n".join(lines)
async def decision_get(
decision_id: str = "",
case_number: str = "",
) -> str:
"""Get full decision details."""
if not decision_id and not case_number:
return "❌ נדרש decision_id או case_number"
if decision_id:
decision = await db.get_decision(UUID(decision_id))
else:
decision = await db.get_decision_by_case_number(case_number)
if not decision:
return "❌ פסק דין לא נמצא"
lines = [
f"📄 {decision.get('title', '')}",
f" מזהה: {decision['id']}",
]
if decision.get("case_number"):
lines.append(f" מספר תיק: {decision['case_number']}")
if decision.get("court"):
lines.append(f" בית משפט: {decision['court']}")
if decision.get("decision_date"):
lines.append(f" תאריך: {decision['decision_date']}")
if decision.get("judge"):
lines.append(f" שופט: {decision['judge']}")
if decision.get("parties_appellant"):
lines.append(f" תובע/מערער: {decision['parties_appellant']}")
if decision.get("parties_respondent"):
lines.append(f" נתבע/משיב: {decision['parties_respondent']}")
if decision.get("topics"):
topics = decision["topics"]
if isinstance(topics, str):
topics = json.loads(topics)
if topics:
lines.append(f" נושאים: {', '.join(topics)}")
if decision.get("outcome"):
lines.append(f" תוצאה: {decision['outcome']}")
if decision.get("summary"):
lines.append(f" תקציר: {decision['summary']}")
lines.append(f" סטטוס: {decision.get('extraction_status', 'unknown')}")
if decision.get("page_count"):
lines.append(f" עמודים: {decision['page_count']}")
# Include extracted text (truncated)
text = decision.get("extracted_text", "")
if text:
lines.append("")
lines.append("── טקסט מחולץ ──")
if len(text) > 15000:
lines.append(text[:15000])
lines.append(f"\n... (נקטע, סה\"כ {len(text)} תווים)")
else:
lines.append(text)
return "\n".join(lines)
async def decision_list(
court: str = "",
topic: str = "",
judge: str = "",
date_from: str = "",
date_to: str = "",
outcome: str = "",
limit: int = 50,
) -> str:
"""List decisions with optional filters."""
d_from = date.fromisoformat(date_from) if date_from else None
d_to = date.fromisoformat(date_to) if date_to else None
decisions = await db.list_decisions(
court=court, topic=topic, judge=judge,
date_from=d_from, date_to=d_to,
outcome=outcome, limit=limit,
)
if not decisions:
return "לא נמצאו פסקי דין"
lines = [f"נמצאו {len(decisions)} פסקי דין:\n"]
for d in decisions:
parts = [f"{d.get('title', 'ללא כותרת')}"]
if d.get("case_number"):
parts.append(f" [{d['case_number']}]")
if d.get("court"):
parts.append(f" {d['court']}")
if d.get("decision_date"):
parts.append(f" {d['decision_date']}")
if d.get("outcome"):
parts.append(f" ({d['outcome']})")
lines.append(" ".join(parts))
lines.append(f" מזהה: {d['id']}")
return "\n".join(lines)
async def decision_update(
decision_id: str,
title: str = "",
court: str = "",
decision_date: str = "",
case_number: str = "",
judge: str = "",
parties_appellant: str = "",
parties_respondent: str = "",
topics: list[str] | None = None,
outcome: str = "",
summary: str = "",
) -> str:
"""Update decision metadata."""
fields = {}
if title:
fields["title"] = title
if court:
fields["court"] = court
if decision_date:
fields["decision_date"] = date.fromisoformat(decision_date)
if case_number:
fields["case_number"] = case_number
if judge:
fields["judge"] = judge
if parties_appellant:
fields["parties_appellant"] = parties_appellant
if parties_respondent:
fields["parties_respondent"] = parties_respondent
if topics is not None:
fields["topics"] = topics
if outcome:
fields["outcome"] = outcome
if summary:
fields["summary"] = summary
if not fields:
return "❌ לא צוינו שדות לעדכון"
result = await db.update_decision(UUID(decision_id), **fields)
if not result:
return "❌ פסק דין לא נמצא"
return f"✅ פסק דין {decision_id} עודכן ({', '.join(fields.keys())})"
async def decision_delete(decision_id: str) -> str:
"""Delete a decision and all its chunks."""
deleted = await db.delete_decision(UUID(decision_id))
if deleted:
return f"✅ פסק דין {decision_id} נמחק"
return "❌ פסק דין לא נמצא"

View File

@@ -1,97 +0,0 @@
"""Search tool implementations."""
from __future__ import annotations
import json
from datetime import date
from din_leumi.services import db, embeddings
async def decision_search(
query: str,
limit: int = 10,
court: str = "",
topic: str = "",
date_from: str = "",
date_to: str = "",
outcome: str = "",
) -> str:
"""Semantic search across all decisions with optional metadata filters."""
if not query.strip():
return "❌ נדרש טקסט לחיפוש"
# Embed query
query_emb = await embeddings.embed_query(query)
d_from = date.fromisoformat(date_from) if date_from else None
d_to = date.fromisoformat(date_to) if date_to else None
results = await db.search_similar(
query_embedding=query_emb,
limit=limit,
court=court,
topic=topic,
date_from=d_from,
date_to=d_to,
outcome=outcome,
)
if not results:
return "לא נמצאו תוצאות"
lines = [f"🔍 נמצאו {len(results)} תוצאות עבור: \"{query}\"\n"]
for i, r in enumerate(results, 1):
score = r.get("score", 0)
lines.append(f"── תוצאה {i} (ציון: {score:.3f}) ──")
lines.append(f" פסק דין: {r.get('title', '')}")
if r.get("case_number"):
lines.append(f" מספר תיק: {r['case_number']}")
if r.get("court"):
lines.append(f" בית משפט: {r['court']}")
if r.get("decision_date"):
lines.append(f" תאריך: {r['decision_date']}")
if r.get("judge"):
lines.append(f" שופט: {r['judge']}")
if r.get("outcome"):
lines.append(f" תוצאה: {r['outcome']}")
lines.append(f" סוג קטע: {r.get('section_type', 'other')}")
lines.append(f" מזהה: {r.get('decision_id', '')}")
lines.append("")
# Show content snippet
content = r.get("content", "")
if len(content) > 500:
content = content[:500] + "..."
lines.append(f" {content}")
lines.append("")
return "\n".join(lines)
async def system_status() -> str:
"""Get system statistics."""
stats = await db.get_stats()
lines = [
"📊 סטטוס מערכת דין לאומי",
"",
f" סה\"כ פסקי דין: {stats['total_decisions']}",
f" עובדו בהצלחה: {stats['completed_decisions']}",
f" סה\"כ chunks: {stats['total_chunks']}",
]
if stats.get("courts"):
lines.append("")
lines.append(" בתי משפט:")
for c in stats["courts"]:
lines.append(f"{c['court']}: {c['count']}")
if stats.get("date_range"):
dr = stats["date_range"]
if dr.get("earliest"):
lines.append("")
lines.append(f" טווח תאריכים: {dr['earliest']}{dr['latest']}")
return "\n".join(lines)

View File

@@ -16,8 +16,6 @@ from uuid import UUID, uuid4
# Allow importing legal_mcp from the MCP server source
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
# Allow importing din_leumi from its MCP server source
sys.path.insert(0, str(Path.home() / "din-leumi" / "mcp-server" / "src"))
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
@@ -28,13 +26,6 @@ from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, processor
from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools
# Din Leumi imports (aliased to avoid collision)
from din_leumi import config as dl_config
from din_leumi.services import db as dl_db
from din_leumi.services import processor as dl_processor
from din_leumi.services import extractor as dl_extractor
import anthropic
logger = logging.getLogger(__name__)
@@ -49,12 +40,9 @@ _progress: dict[str, dict] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
dl_config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True)
await db.init_schema()
await dl_db.init_schema()
yield
await db.close_pool()
await dl_db.close_pool()
app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan)
@@ -558,94 +546,6 @@ async def api_document_text(doc_id: str):
return {"doc_id": doc_id, "text": text}
# ── Din Leumi Endpoint ────────────────────────────────────────────
class DinLeumiRequest(BaseModel):
filename: str
title: str = ""
@app.post("/api/classify-dinleumi")
async def classify_dinleumi(req: DinLeumiRequest):
"""Upload a decision to Din Leumi with auto metadata extraction."""
source = UPLOAD_DIR / req.filename
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found in uploads")
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": req.filename}
asyncio.create_task(_process_dinleumi_decision(task_id, source, req))
return {"task_id": task_id}
# ── Metadata Extraction ──────────────────────────────────────────
METADATA_EXTRACTION_PROMPT = """אתה מנתח פסקי דין של בתי דין לעבודה בתחום ביטוח לאומי.
חלץ את המטאדאטא הבאה מתוך פסק הדין והחזר אותה כ-JSON בלבד:
{
"title": "כותרת תיאורית קצרה של פסק הדין",
"court": "שם בית המשפט (למשל: בית הדין האזורי לעבודה תל אביב)",
"decision_date": "YYYY-MM-DD או null אם לא נמצא",
"case_number": "מספר תיק (למשל: בל 12345-06-20)",
"judge": "שם השופט/ת",
"parties_appellant": "שם התובע/מערער",
"parties_respondent": "שם הנתבע/משיב",
"topics": ["רשימת נושאים רלוונטיים מתוך הרשימה למטה"],
"outcome": "accepted/rejected/partial/remanded",
"summary": "תקציר של 2-3 משפטים"
}
נושאים אפשריים: נכות כללית, נכות מעבודה, תאונת עבודה, דמי לידה, דמי אבטלה, גמלת הבטחת הכנסה, גמלת ניידות, גמלת סיעוד, קצבת זקנה, קצבת שאירים, מילואים, דמי פגיעה, נפגעי פעולות איבה
החזר JSON בלבד, ללא טקסט נוסף."""
_anthropic_client: anthropic.Anthropic | None = None
def _get_anthropic() -> anthropic.Anthropic:
global _anthropic_client
if _anthropic_client is None:
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
return _anthropic_client
async def _extract_metadata_with_claude(text: str) -> dict:
"""Extract metadata from decision text using Claude."""
client = _get_anthropic()
# Use first ~5000 chars (usually contains all metadata)
excerpt = text[:5000]
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{
"role": "user",
"content": f"{METADATA_EXTRACTION_PROMPT}\n\nפסק הדין:\n{excerpt}",
}
],
)
response_text = message.content[0].text.strip()
# Parse JSON from response (handle potential markdown wrapping)
if response_text.startswith("```"):
response_text = response_text.split("```")[1]
if response_text.startswith("json"):
response_text = response_text[4:]
try:
metadata = json.loads(response_text)
except json.JSONDecodeError:
logger.warning("Failed to parse metadata JSON: %s", response_text[:200])
metadata = {}
return metadata
# ── Background Processing ─────────────────────────────────────────
@@ -802,117 +702,3 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe
"chunks": chunk_count,
},
}
async def _process_dinleumi_decision(task_id: str, source: Path, req: DinLeumiRequest):
"""Process a National Insurance court decision with auto metadata extraction."""
from datetime import date as date_type
try:
# Step 1: Copy to din-leumi decisions directory
_progress[task_id] = {"status": "copying", "filename": req.filename}
original_name = re.sub(r"^\d+_", "", source.name)
dest = dl_config.DECISIONS_DIR / original_name
if dest.exists():
dest = dl_config.DECISIONS_DIR / f"{dest.stem}_{int(time.time())}{dest.suffix}"
shutil.copy2(str(source), str(dest))
# Step 2: Extract text
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
text, page_count = await dl_extractor.extract_text(str(dest))
# Step 3: Extract metadata with Claude
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting_metadata"}
metadata = await _extract_metadata_with_claude(text)
# Parse date
d_date = None
if metadata.get("decision_date"):
try:
d_date = date_type.fromisoformat(metadata["decision_date"])
except (ValueError, TypeError):
d_date = None
title = req.title or metadata.get("title", original_name.rsplit(".", 1)[0])
# Step 4: Create decision record
_progress[task_id] = {"status": "registering", "filename": req.filename}
decision = await dl_db.create_decision(
title=title,
file_path=str(dest),
court=metadata.get("court", ""),
decision_date=d_date,
case_number=metadata.get("case_number", ""),
judge=metadata.get("judge", ""),
parties_appellant=metadata.get("parties_appellant", ""),
parties_respondent=metadata.get("parties_respondent", "המוסד לביטוח לאומי"),
topics=metadata.get("topics"),
outcome=metadata.get("outcome", ""),
)
decision_id = UUID(decision["id"])
# Update with extracted text
await dl_db.update_decision(
decision_id,
extracted_text=text,
page_count=page_count,
summary=metadata.get("summary", ""),
)
# Step 5: Chunk
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
from din_leumi.services import chunker as dl_chunker, embeddings as dl_embeddings
chunks = dl_chunker.chunk_document(text)
chunk_count = 0
if chunks:
# Step 6: Embed
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"}
texts = [c.content for c in chunks]
embs = await dl_embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await dl_db.store_chunks(decision_id, chunk_dicts)
chunk_count = len(chunks)
await dl_db.update_decision(decision_id, extraction_status="completed")
await dl_db.ensure_ivfflat_index()
# Remove from uploads
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"system": "din-leumi",
"result": {
"decision_id": str(decision_id),
"title": title,
"pages": page_count,
"text_length": len(text),
"chunks": chunk_count,
},
"metadata": {
"court": metadata.get("court", ""),
"judge": metadata.get("judge", ""),
"case_number": metadata.get("case_number", ""),
"decision_date": metadata.get("decision_date", ""),
"outcome": metadata.get("outcome", ""),
"topics": metadata.get("topics", []),
"summary": metadata.get("summary", ""),
},
}
except Exception as e:
logger.exception("Din Leumi processing failed for %s", req.filename)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}