From 324807ff1db3b7d0ba76c1cf8528bba76e6b2311 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 4 Apr 2026 08:21:31 +0000 Subject: [PATCH] Fix Docker build: bundle din-leumi instead of git clone Removes GITEA_TOKEN dependency from build by copying din-leumi MCP server source directly into the Docker context. Co-Authored-By: Claude Opus 4.6 (1M context) --- Dockerfile | 8 +- din-leumi/pyproject.toml | 25 ++ din-leumi/src/din_leumi/__init__.py | 0 din-leumi/src/din_leumi/__main__.py | 4 + din-leumi/src/din_leumi/config.py | 36 ++ din-leumi/src/din_leumi/server.py | 156 ++++++++ din-leumi/src/din_leumi/services/__init__.py | 0 din-leumi/src/din_leumi/services/chunker.py | 132 +++++++ din-leumi/src/din_leumi/services/db.py | 374 ++++++++++++++++++ .../src/din_leumi/services/embeddings.py | 55 +++ din-leumi/src/din_leumi/services/extractor.py | 126 ++++++ din-leumi/src/din_leumi/services/processor.py | 82 ++++ din-leumi/src/din_leumi/tools/__init__.py | 0 din-leumi/src/din_leumi/tools/decisions.py | 241 +++++++++++ din-leumi/src/din_leumi/tools/search.py | 97 +++++ 15 files changed, 1331 insertions(+), 5 deletions(-) create mode 100644 din-leumi/pyproject.toml create mode 100644 din-leumi/src/din_leumi/__init__.py create mode 100644 din-leumi/src/din_leumi/__main__.py create mode 100644 din-leumi/src/din_leumi/config.py create mode 100644 din-leumi/src/din_leumi/server.py create mode 100644 din-leumi/src/din_leumi/services/__init__.py create mode 100644 din-leumi/src/din_leumi/services/chunker.py create mode 100644 din-leumi/src/din_leumi/services/db.py create mode 100644 din-leumi/src/din_leumi/services/embeddings.py create mode 100644 din-leumi/src/din_leumi/services/extractor.py create mode 100644 din-leumi/src/din_leumi/services/processor.py create mode 100644 din-leumi/src/din_leumi/tools/__init__.py create mode 100644 din-leumi/src/din_leumi/tools/decisions.py create mode 100644 din-leumi/src/din_leumi/tools/search.py diff --git a/Dockerfile b/Dockerfile index 8058294..401183d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,11 +11,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ COPY mcp-server/pyproject.toml /app/mcp-server/pyproject.toml COPY mcp-server/src/ /app/mcp-server/src/ -# Clone and install Din Leumi MCP server -ARG GITEA_TOKEN="" -RUN git clone https://Chaim:${GITEA_TOKEN}@gitea.nautilus.marcusgroup.org/Chaim/din-leumi.git /tmp/din-leumi && \ - cp -r /tmp/din-leumi/mcp-server /app/din-leumi && \ - rm -rf /tmp/din-leumi/.git +# Copy Din Leumi MCP server source +COPY din-leumi/ /app/din-leumi/ # Install both MCP servers + web deps RUN pip install --no-cache-dir /app/mcp-server /app/din-leumi && \ @@ -25,6 +22,7 @@ RUN pip install --no-cache-dir /app/mcp-server /app/din-leumi && \ COPY web/ /app/web/ ENV PYTHONPATH=/app/mcp-server/src:/app/din-leumi/src +ENV DOTENV_PATH=/dev/null ENV DOTENV_PATH=/home/chaim/.env EXPOSE 8080 diff --git a/din-leumi/pyproject.toml b/din-leumi/pyproject.toml new file mode 100644 index 0000000..22001ef --- /dev/null +++ b/din-leumi/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "din-leumi" +version = "0.1.0" +description = "MCP server for cataloging and searching National Insurance court decisions" +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.0.0", + "asyncpg>=0.29.0", + "pgvector>=0.3.0", + "voyageai>=0.3.0", + "anthropic>=0.40.0", + "python-dotenv>=1.0.0", + "pydantic>=2.0.0", + "pymupdf>=1.25.0", + "python-docx>=1.1.0", + "striprtf>=0.0.26", + "pillow>=10.0.0", +] + +[build-system] +requires = ["setuptools>=68.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/din-leumi/src/din_leumi/__init__.py b/din-leumi/src/din_leumi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/din-leumi/src/din_leumi/__main__.py b/din-leumi/src/din_leumi/__main__.py new file mode 100644 index 0000000..02ac546 --- /dev/null +++ b/din-leumi/src/din_leumi/__main__.py @@ -0,0 +1,4 @@ +"""Allow running with: python -m din_leumi""" +from din_leumi.server import main + +main() diff --git a/din-leumi/src/din_leumi/config.py b/din-leumi/src/din_leumi/config.py new file mode 100644 index 0000000..20aa59d --- /dev/null +++ b/din-leumi/src/din_leumi/config.py @@ -0,0 +1,36 @@ +"""Configuration loaded from central .env file.""" + +import os +from pathlib import Path + +from dotenv import load_dotenv + +# Load from central .env or override path +dotenv_path = os.environ.get("DOTENV_PATH", str(Path.home() / ".env")) +load_dotenv(dotenv_path) + +# PostgreSQL - uses shared server but separate database +POSTGRES_URL = os.environ.get( + "DIN_LEUMI_POSTGRES_URL", + f"postgres://{os.environ.get('POSTGRES_USER', 'legal_ai')}:" + f"{os.environ.get('POSTGRES_PASSWORD', '')}@" + f"{os.environ.get('POSTGRES_HOST', '127.0.0.1')}:" + f"{os.environ.get('POSTGRES_PORT', '5433')}/" + f"{os.environ.get('DIN_LEUMI_POSTGRES_DB', 'din_leumi')}", +) + +# Voyage AI +VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY", "") +VOYAGE_MODEL = "voyage-3-large" +VOYAGE_DIMENSIONS = 1024 + +# Anthropic (for Claude Vision OCR) +ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") + +# Data directory +DATA_DIR = Path(os.environ.get("DIN_LEUMI_DATA_DIR", str(Path.home() / "din-leumi" / "data"))) +DECISIONS_DIR = DATA_DIR / "decisions" + +# Chunking parameters +CHUNK_SIZE_TOKENS = 600 +CHUNK_OVERLAP_TOKENS = 100 diff --git a/din-leumi/src/din_leumi/server.py b/din-leumi/src/din_leumi/server.py new file mode 100644 index 0000000..a30281e --- /dev/null +++ b/din-leumi/src/din_leumi/server.py @@ -0,0 +1,156 @@ +"""Din Leumi - MCP Server entry point. + +Run with: python -m din_leumi.server +""" + +from __future__ import annotations + +import logging +import sys +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +from mcp.server.fastmcp import FastMCP + +# Configure logging to stderr (stdout is reserved for JSON-RPC) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + stream=sys.stderr, +) +logger = logging.getLogger("din_leumi") + + +@asynccontextmanager +async def lifespan(server: FastMCP) -> AsyncIterator[None]: + """Initialize DB schema on startup, close pool on shutdown.""" + from din_leumi.services.db import close_pool, init_schema + + logger.info("Initializing database schema...") + await init_schema() + logger.info("Din Leumi MCP server ready") + try: + yield + finally: + await close_pool() + logger.info("Din Leumi MCP server stopped") + + +# Create MCP server +mcp = FastMCP( + "Din Leumi - דין לאומי", + instructions="מערכת לקטלוג וחיפוש סמנטי של פסקי דין בתחום ביטוח לאומי", + lifespan=lifespan, +) + +# ── Import tool modules ──────────────────────────────────────────── + +from din_leumi.tools import decisions, search # noqa: E402 + + +# ── Decision management ──────────────────────────────────────────── + +@mcp.tool() +async def decision_upload( + file_path: str, + title: str = "", + court: str = "", + decision_date: str = "", + case_number: str = "", + judge: str = "", + parties_appellant: str = "", + parties_respondent: str = "המוסד לביטוח לאומי", + topics: list[str] | None = None, + outcome: str = "", +) -> str: + """העלאה ועיבוד פסק דין של בית דין לעבודה בתחום ביטוח לאומי (PDF/DOCX/RTF/TXT). + מחלץ טקסט, יוצר chunks ו-embeddings לחיפוש סמנטי. + outcome: accepted/rejected/partial/remanded.""" + return await decisions.decision_upload( + file_path, title, court, decision_date, case_number, + judge, parties_appellant, parties_respondent, topics, outcome, + ) + + +@mcp.tool() +async def decision_search( + query: str, + limit: int = 10, + court: str = "", + topic: str = "", + date_from: str = "", + date_to: str = "", + outcome: str = "", +) -> str: + """חיפוש סמנטי בפסקי דין של ביטוח לאומי. + ניתן לסנן לפי בית משפט, נושא, טווח תאריכים ותוצאה. + נושאים נפוצים: נכות כללית, נכות מעבודה, דמי לידה, תאונת עבודה, גמלת הבטחת הכנסה, דמי אבטלה.""" + return await search.decision_search( + query, limit, court, topic, date_from, date_to, outcome, + ) + + +@mcp.tool() +async def decision_list( + court: str = "", + topic: str = "", + judge: str = "", + date_from: str = "", + date_to: str = "", + outcome: str = "", + limit: int = 50, +) -> str: + """רשימת פסקי דין לפי מטאדאטא. סינון אופציונלי לפי בית משפט, נושא, שופט, תאריכים, תוצאה.""" + return await decisions.decision_list( + court, topic, judge, date_from, date_to, outcome, limit, + ) + + +@mcp.tool() +async def decision_get( + decision_id: str = "", + case_number: str = "", +) -> str: + """פרטי פסק דין מלאים כולל טקסט מחולץ. חיפוש לפי decision_id או case_number.""" + return await decisions.decision_get(decision_id, case_number) + + +@mcp.tool() +async def decision_update( + decision_id: str, + title: str = "", + court: str = "", + decision_date: str = "", + case_number: str = "", + judge: str = "", + parties_appellant: str = "", + parties_respondent: str = "", + topics: list[str] | None = None, + outcome: str = "", + summary: str = "", +) -> str: + """עדכון מטאדאטא של פסק דין.""" + return await decisions.decision_update( + decision_id, title, court, decision_date, case_number, + judge, parties_appellant, parties_respondent, topics, outcome, summary, + ) + + +@mcp.tool() +async def decision_delete(decision_id: str) -> str: + """מחיקת פסק דין וכל ה-chunks שלו מהמערכת.""" + return await decisions.decision_delete(decision_id) + + +@mcp.tool() +async def system_status() -> str: + """סטטוס מערכת דין לאומי - מספר פסקי דין, chunks, סטטיסטיקות.""" + return await search.system_status() + + +def main(): + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main() diff --git a/din-leumi/src/din_leumi/services/__init__.py b/din-leumi/src/din_leumi/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/din-leumi/src/din_leumi/services/chunker.py b/din-leumi/src/din_leumi/services/chunker.py new file mode 100644 index 0000000..3fe7016 --- /dev/null +++ b/din-leumi/src/din_leumi/services/chunker.py @@ -0,0 +1,132 @@ +"""Legal document chunker - splits text into sections and chunks for RAG. + +Adapted for National Insurance (Bituach Leumi) court decisions. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass + +from din_leumi import config + +# Hebrew legal section headers for NI court decisions +SECTION_PATTERNS = [ + (r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע|עובדות\s*המקרה", "facts"), + (r"טענות\s*התובע[ת]?|טענות\s*המבוטח[ת]?|טענות\s*המערער[ת]?|עיקר\s*טענות\s*התובע", "claimant_claims"), + (r"טענות\s*הנתבע[ת]?|טענות\s*המוסד|עיקר\s*טענות\s*הנתבע|תשובת\s*המוסד", "respondent_claims"), + (r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|הכרעת\s*הדין", "legal_analysis"), + (r"מסקנ[הות]|סיכום", "conclusion"), + (r"סוף\s*דבר|לסיכום|התוצאה|אשר\s*על\s*כן", "ruling"), + (r"מבוא|פתיחה|לפניי|לפני[נו]?", "intro"), + (r"הדין\s*החל|המסגרת\s*הנורמטיבית|הוראות\s*החוק", "legal_framework"), +] + + +@dataclass +class Chunk: + content: str + section_type: str = "other" + page_number: int | None = None + chunk_index: int = 0 + + +def chunk_document( + text: str, + chunk_size: int = config.CHUNK_SIZE_TOKENS, + overlap: int = config.CHUNK_OVERLAP_TOKENS, +) -> list[Chunk]: + """Split a legal document into chunks, respecting section boundaries.""" + if not text.strip(): + return [] + + sections = _split_into_sections(text) + chunks: list[Chunk] = [] + idx = 0 + + for section_type, section_text in sections: + section_chunks = _split_section(section_text, chunk_size, overlap) + for chunk_text in section_chunks: + chunks.append(Chunk( + content=chunk_text, + section_type=section_type, + chunk_index=idx, + )) + idx += 1 + + return chunks + + +def _split_into_sections(text: str) -> list[tuple[str, str]]: + """Split text into (section_type, text) pairs based on Hebrew headers.""" + markers: list[tuple[int, str]] = [] + + for pattern, section_type in SECTION_PATTERNS: + for match in re.finditer(pattern, text): + markers.append((match.start(), section_type)) + + if not markers: + return [("other", text)] + + markers.sort(key=lambda x: x[0]) + + sections: list[tuple[str, str]] = [] + + # Text before first section + if markers[0][0] > 0: + intro_text = text[: markers[0][0]].strip() + if intro_text: + sections.append(("intro", intro_text)) + + # Each section + for i, (pos, section_type) in enumerate(markers): + end = markers[i + 1][0] if i + 1 < len(markers) else len(text) + section_text = text[pos:end].strip() + if section_text: + sections.append((section_type, section_text)) + + return sections + + +def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]: + """Split a section into overlapping chunks by paragraphs. + + Uses approximate token counting (Hebrew ~1.5 chars per token). + """ + if not text.strip(): + return [] + + paragraphs = [p.strip() for p in text.split("\n") if p.strip()] + chunks: list[str] = [] + current: list[str] = [] + current_tokens = 0 + + for para in paragraphs: + para_tokens = _estimate_tokens(para) + + if current_tokens + para_tokens > chunk_size and current: + chunks.append("\n".join(current)) + # Keep overlap + overlap_paras: list[str] = [] + overlap_tokens = 0 + for p in reversed(current): + pt = _estimate_tokens(p) + if overlap_tokens + pt > overlap: + break + overlap_paras.insert(0, p) + overlap_tokens += pt + current = overlap_paras + current_tokens = overlap_tokens + + current.append(para) + current_tokens += para_tokens + + if current: + chunks.append("\n".join(current)) + + return chunks + + +def _estimate_tokens(text: str) -> int: + """Rough token estimate for Hebrew text (~1.5 chars per token).""" + return max(1, len(text) // 2) diff --git a/din-leumi/src/din_leumi/services/db.py b/din-leumi/src/din_leumi/services/db.py new file mode 100644 index 0000000..ffdeac2 --- /dev/null +++ b/din-leumi/src/din_leumi/services/db.py @@ -0,0 +1,374 @@ +"""Database service - asyncpg connection pool and queries for din-leumi.""" + +from __future__ import annotations + +import json +import logging +from datetime import date +from uuid import UUID, uuid4 + +import asyncpg +from pgvector.asyncpg import register_vector + +from din_leumi import config + +logger = logging.getLogger(__name__) + +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + global _pool + if _pool is None: + conn = await asyncpg.connect(config.POSTGRES_URL) + await conn.execute('CREATE EXTENSION IF NOT EXISTS vector') + await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"') + await conn.close() + + _pool = await asyncpg.create_pool( + config.POSTGRES_URL, + min_size=2, + max_size=10, + init=_init_connection, + ) + return _pool + + +async def _init_connection(conn: asyncpg.Connection) -> None: + await register_vector(conn) + + +async def close_pool() -> None: + global _pool + if _pool: + await _pool.close() + _pool = None + + +# ── Schema ────────────────────────────────────────────────────────── + +SCHEMA_SQL = """ + +CREATE TABLE IF NOT EXISTS decisions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + title TEXT NOT NULL, + file_path TEXT NOT NULL, + extracted_text TEXT DEFAULT '', + extraction_status TEXT DEFAULT 'pending', + + court TEXT DEFAULT '', + decision_date DATE, + case_number TEXT DEFAULT '', + judge TEXT DEFAULT '', + parties_appellant TEXT DEFAULT '', + parties_respondent TEXT DEFAULT 'המוסד לביטוח לאומי', + topics JSONB DEFAULT '[]', + outcome TEXT DEFAULT '', + summary TEXT DEFAULT '', + + page_count INTEGER, + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS decision_chunks ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, + chunk_index INTEGER NOT NULL, + content TEXT NOT NULL, + section_type TEXT DEFAULT 'other', + embedding vector(1024), + page_number INTEGER, + created_at TIMESTAMPTZ DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_chunks_decision ON decision_chunks(decision_id); +CREATE INDEX IF NOT EXISTS idx_decisions_court ON decisions(court); +CREATE INDEX IF NOT EXISTS idx_decisions_date ON decisions(decision_date); +CREATE INDEX IF NOT EXISTS idx_decisions_case_number ON decisions(case_number); +CREATE INDEX IF NOT EXISTS idx_decisions_topics ON decisions USING gin(topics); +""" + +# IVFFlat index requires data to exist; create after first batch of decisions +IVFFLAT_INDEX_SQL = """ +CREATE INDEX IF NOT EXISTS idx_chunks_embedding + ON decision_chunks USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 100); +""" + + +async def init_schema() -> None: + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute(SCHEMA_SQL) + # Check if we have enough data for IVFFlat + count = await conn.fetchval("SELECT count(*) FROM decision_chunks") + if count and count > 100: + await conn.execute(IVFFLAT_INDEX_SQL) + logger.info("IVFFlat index created (%d chunks)", count) + logger.info("Database schema initialized") + + +async def ensure_ivfflat_index() -> None: + """Create IVFFlat index if enough data exists.""" + pool = await get_pool() + async with pool.acquire() as conn: + count = await conn.fetchval("SELECT count(*) FROM decision_chunks") + if count and count > 100: + await conn.execute(IVFFLAT_INDEX_SQL) + + +# ── Decision CRUD ─────────────────────────────────────────────────── + +async def create_decision( + title: str, + file_path: str, + court: str = "", + decision_date: date | None = None, + case_number: str = "", + judge: str = "", + parties_appellant: str = "", + parties_respondent: str = "המוסד לביטוח לאומי", + topics: list[str] | None = None, + outcome: str = "", +) -> dict: + pool = await get_pool() + decision_id = uuid4() + async with pool.acquire() as conn: + await conn.execute( + """INSERT INTO decisions (id, title, file_path, court, decision_date, + case_number, judge, parties_appellant, parties_respondent, + topics, outcome) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""", + decision_id, title, file_path, court, decision_date, + case_number, judge, parties_appellant, parties_respondent, + json.dumps(topics or []), outcome, + ) + return await get_decision(decision_id) + + +async def get_decision(decision_id: UUID) -> dict | None: + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id) + if row is None: + return None + return _row_to_decision(row) + + +async def get_decision_by_case_number(case_number: str) -> dict | None: + pool = await get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM decisions WHERE case_number = $1", case_number + ) + if row is None: + return None + return _row_to_decision(row) + + +async def list_decisions( + court: str = "", + topic: str = "", + judge: str = "", + date_from: date | None = None, + date_to: date | None = None, + outcome: str = "", + limit: int = 50, +) -> list[dict]: + pool = await get_pool() + conditions = [] + params: list = [] + idx = 1 + + if court: + conditions.append(f"court ILIKE ${idx}") + params.append(f"%{court}%") + idx += 1 + if topic: + conditions.append(f"topics @> ${idx}::jsonb") + params.append(json.dumps([topic])) + idx += 1 + if judge: + conditions.append(f"judge ILIKE ${idx}") + params.append(f"%{judge}%") + idx += 1 + if date_from: + conditions.append(f"decision_date >= ${idx}") + params.append(date_from) + idx += 1 + if date_to: + conditions.append(f"decision_date <= ${idx}") + params.append(date_to) + idx += 1 + if outcome: + conditions.append(f"outcome = ${idx}") + params.append(outcome) + idx += 1 + + where = f"WHERE {' AND '.join(conditions)}" if conditions else "" + params.append(limit) + + sql = f""" + SELECT id, title, court, decision_date, case_number, judge, + parties_appellant, parties_respondent, topics, outcome, summary, + extraction_status, page_count, created_at + FROM decisions + {where} + ORDER BY decision_date DESC NULLS LAST, created_at DESC + LIMIT ${idx} + """ + async with pool.acquire() as conn: + rows = await conn.fetch(sql, *params) + return [_row_to_decision(r) for r in rows] + + +async def update_decision(decision_id: UUID, **fields) -> dict | None: + if not fields: + return await get_decision(decision_id) + pool = await get_pool() + set_clauses = [] + values = [] + for i, (key, val) in enumerate(fields.items(), start=2): + if key == "topics": + val = json.dumps(val) + set_clauses.append(f"{key} = ${i}") + values.append(val) + set_clauses.append("updated_at = now()") + sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1" + async with pool.acquire() as conn: + await conn.execute(sql, decision_id, *values) + return await get_decision(decision_id) + + +async def delete_decision(decision_id: UUID) -> bool: + pool = await get_pool() + async with pool.acquire() as conn: + result = await conn.execute("DELETE FROM decisions WHERE id = $1", decision_id) + return result == "DELETE 1" + + +def _row_to_decision(row: asyncpg.Record) -> dict: + d = dict(row) + if isinstance(d.get("topics"), str): + d["topics"] = json.loads(d["topics"]) + for field in ("id",): + if field in d and d[field] is not None: + d[field] = str(d[field]) + return d + + +# ── Chunks & Vectors ─────────────────────────────────────────────── + +async def store_chunks( + decision_id: UUID, + chunks: list[dict], +) -> int: + """Store decision chunks with embeddings.""" + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute( + "DELETE FROM decision_chunks WHERE decision_id = $1", decision_id + ) + for chunk in chunks: + await conn.execute( + """INSERT INTO decision_chunks + (decision_id, chunk_index, content, section_type, embedding, page_number) + VALUES ($1, $2, $3, $4, $5, $6)""", + decision_id, + chunk["chunk_index"], + chunk["content"], + chunk.get("section_type", "other"), + chunk["embedding"], + chunk.get("page_number"), + ) + return len(chunks) + + +async def search_similar( + query_embedding: list[float], + limit: int = 10, + court: str = "", + topic: str = "", + date_from: date | None = None, + date_to: date | None = None, + outcome: str = "", +) -> list[dict]: + """Cosine similarity search on decision chunks with metadata filtering.""" + pool = await get_pool() + conditions = [] + params: list = [query_embedding, limit] + idx = 3 + + if court: + conditions.append(f"d.court ILIKE ${idx}") + params.append(f"%{court}%") + idx += 1 + if topic: + conditions.append(f"d.topics @> ${idx}::jsonb") + params.append(json.dumps([topic])) + idx += 1 + if date_from: + conditions.append(f"d.decision_date >= ${idx}") + params.append(date_from) + idx += 1 + if date_to: + conditions.append(f"d.decision_date <= ${idx}") + params.append(date_to) + idx += 1 + if outcome: + conditions.append(f"d.outcome = ${idx}") + params.append(outcome) + idx += 1 + + where = f"WHERE {' AND '.join(conditions)}" if conditions else "" + + sql = f""" + SELECT dc.content, dc.section_type, dc.page_number, + dc.decision_id, + d.title, d.case_number, d.court, d.decision_date, + d.judge, d.outcome, + 1 - (dc.embedding <=> $1) AS score + FROM decision_chunks dc + JOIN decisions d ON d.id = dc.decision_id + {where} + ORDER BY dc.embedding <=> $1 + LIMIT $2 + """ + async with pool.acquire() as conn: + rows = await conn.fetch(sql, *params) + results = [] + for r in rows: + d = dict(r) + if d.get("decision_id"): + d["decision_id"] = str(d["decision_id"]) + results.append(d) + return results + + +# ── Stats ────────────────────────────────────────────────────────── + +async def get_stats() -> dict: + pool = await get_pool() + async with pool.acquire() as conn: + total_decisions = await conn.fetchval("SELECT count(*) FROM decisions") + total_chunks = await conn.fetchval("SELECT count(*) FROM decision_chunks") + completed = await conn.fetchval( + "SELECT count(*) FROM decisions WHERE extraction_status = 'completed'" + ) + courts = await conn.fetch( + "SELECT court, count(*) as cnt FROM decisions WHERE court != '' GROUP BY court ORDER BY cnt DESC LIMIT 10" + ) + date_range = await conn.fetchrow( + "SELECT min(decision_date) as earliest, max(decision_date) as latest FROM decisions WHERE decision_date IS NOT NULL" + ) + return { + "total_decisions": total_decisions, + "completed_decisions": completed, + "total_chunks": total_chunks, + "courts": [{"court": r["court"], "count": r["cnt"]} for r in courts], + "date_range": { + "earliest": str(date_range["earliest"]) if date_range and date_range["earliest"] else None, + "latest": str(date_range["latest"]) if date_range and date_range["latest"] else None, + } if date_range else None, + } diff --git a/din-leumi/src/din_leumi/services/embeddings.py b/din-leumi/src/din_leumi/services/embeddings.py new file mode 100644 index 0000000..43e6969 --- /dev/null +++ b/din-leumi/src/din_leumi/services/embeddings.py @@ -0,0 +1,55 @@ +"""Embedding service using Voyage AI API.""" + +from __future__ import annotations + +import logging + +import voyageai + +from din_leumi import config + +logger = logging.getLogger(__name__) + +_client: voyageai.Client | None = None + + +def _get_client() -> voyageai.Client: + global _client + if _client is None: + _client = voyageai.Client(api_key=config.VOYAGE_API_KEY) + return _client + + +async def embed_texts(texts: list[str], input_type: str = "document") -> list[list[float]]: + """Embed a batch of texts using Voyage AI. + + Args: + texts: List of texts to embed (max 128 per call). + input_type: "document" for indexing, "query" for search queries. + + Returns: + List of embedding vectors (1024 dimensions each). + """ + if not texts: + return [] + + client = _get_client() + all_embeddings = [] + + # Voyage AI supports up to 128 texts per batch + for i in range(0, len(texts), 128): + batch = texts[i : i + 128] + result = client.embed( + batch, + model=config.VOYAGE_MODEL, + input_type=input_type, + ) + all_embeddings.extend(result.embeddings) + + return all_embeddings + + +async def embed_query(query: str) -> list[float]: + """Embed a single search query.""" + results = await embed_texts([query], input_type="query") + return results[0] diff --git a/din-leumi/src/din_leumi/services/extractor.py b/din-leumi/src/din_leumi/services/extractor.py new file mode 100644 index 0000000..d5525ee --- /dev/null +++ b/din-leumi/src/din_leumi/services/extractor.py @@ -0,0 +1,126 @@ +"""Text extraction from PDF, DOCX, and RTF files. + +Primary PDF extraction: Claude Vision API (for scanned documents). +Fallback: PyMuPDF direct text extraction (for born-digital PDFs). +""" + +from __future__ import annotations + +import base64 +import logging +from pathlib import Path + +import anthropic +import fitz # PyMuPDF +from docx import Document as DocxDocument +from striprtf.striprtf import rtf_to_text + +from din_leumi import config + +logger = logging.getLogger(__name__) + +_anthropic_client: anthropic.Anthropic | None = None + + +def _get_anthropic() -> anthropic.Anthropic: + global _anthropic_client + if _anthropic_client is None: + _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) + return _anthropic_client + + +async def extract_text(file_path: str) -> tuple[str, int]: + """Extract text from a document file. + + Returns: + Tuple of (extracted_text, page_count). + page_count is 0 for non-PDF files. + """ + path = Path(file_path) + suffix = path.suffix.lower() + + if suffix == ".pdf": + return await _extract_pdf(path) + elif suffix == ".docx": + return _extract_docx(path), 0 + elif suffix == ".rtf": + return _extract_rtf(path), 0 + elif suffix == ".txt": + return path.read_text(encoding="utf-8"), 0 + else: + raise ValueError(f"Unsupported file type: {suffix}") + + +async def _extract_pdf(path: Path) -> tuple[str, int]: + """Extract text from PDF. Try direct text first, fall back to Claude Vision for scanned pages.""" + doc = fitz.open(str(path)) + page_count = len(doc) + pages_text: list[str] = [] + + for page_num in range(page_count): + page = doc[page_num] + # Try direct text extraction first + text = page.get_text().strip() + + if len(text) > 50: + # Sufficient text found - born-digital page + pages_text.append(text) + logger.debug("Page %d: direct text extraction (%d chars)", page_num + 1, len(text)) + else: + # Likely scanned - use Claude Vision + logger.info("Page %d: using Claude Vision OCR", page_num + 1) + pix = page.get_pixmap(dpi=200) + img_bytes = pix.tobytes("png") + ocr_text = await _ocr_with_claude(img_bytes, page_num + 1) + pages_text.append(ocr_text) + + doc.close() + return "\n\n".join(pages_text), page_count + + +async def _ocr_with_claude(image_bytes: bytes, page_num: int) -> str: + """OCR a single page image using Claude Vision API.""" + client = _get_anthropic() + b64_image = base64.b64encode(image_bytes).decode("utf-8") + + message = client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=4096, + messages=[ + { + "role": "user", + "content": [ + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": b64_image, + }, + }, + { + "type": "text", + "text": ( + "חלץ את כל הטקסט מהתמונה הזו. זהו מסמך משפטי בעברית. " + "שמור על מבנה הפסקאות המקורי. " + "החזר רק את הטקסט המחולץ, ללא הערות נוספות." + ), + }, + ], + } + ], + ) + return message.content[0].text + + +def _extract_docx(path: Path) -> str: + """Extract text from DOCX file.""" + doc = DocxDocument(str(path)) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + return "\n\n".join(paragraphs) + + +def _extract_rtf(path: Path) -> str: + """Extract text from RTF file.""" + rtf_content = path.read_text(encoding="utf-8", errors="replace") + return rtf_to_text(rtf_content) diff --git a/din-leumi/src/din_leumi/services/processor.py b/din-leumi/src/din_leumi/services/processor.py new file mode 100644 index 0000000..829af69 --- /dev/null +++ b/din-leumi/src/din_leumi/services/processor.py @@ -0,0 +1,82 @@ +"""Decision processing pipeline: extract → chunk → embed → store.""" + +from __future__ import annotations + +import logging +from uuid import UUID + +from din_leumi.services import chunker, db, embeddings, extractor + +logger = logging.getLogger(__name__) + + +async def process_decision(decision_id: UUID) -> dict: + """Full processing pipeline for a decision. + + 1. Extract text from file + 2. Split into chunks + 3. Generate embeddings + 4. Store chunks + embeddings in DB + + Returns processing summary. + """ + decision = await db.get_decision(decision_id) + if not decision: + raise ValueError(f"Decision {decision_id} not found") + + await db.update_decision(decision_id, extraction_status="processing") + + try: + # Step 1: Extract text + logger.info("Extracting text from %s", decision["file_path"]) + text, page_count = await extractor.extract_text(decision["file_path"]) + + await db.update_decision( + decision_id, + extracted_text=text, + page_count=page_count, + ) + + # Step 2: Chunk + logger.info("Chunking decision (%d chars)", len(text)) + chunks = chunker.chunk_document(text) + + if not chunks: + await db.update_decision(decision_id, extraction_status="completed") + return {"status": "completed", "chunks": 0, "message": "No text to chunk"} + + # Step 3: Embed + logger.info("Generating embeddings for %d chunks", len(chunks)) + texts = [c.content for c in chunks] + embs = await embeddings.embed_texts(texts, input_type="document") + + # Step 4: Store + chunk_dicts = [ + { + "content": c.content, + "section_type": c.section_type, + "embedding": emb, + "page_number": c.page_number, + "chunk_index": c.chunk_index, + } + for c, emb in zip(chunks, embs) + ] + + stored = await db.store_chunks(decision_id, chunk_dicts) + await db.update_decision(decision_id, extraction_status="completed") + + # Try to create IVFFlat index if we have enough data + await db.ensure_ivfflat_index() + + logger.info("Decision processed: %d chunks stored", stored) + return { + "status": "completed", + "chunks": stored, + "pages": page_count, + "text_length": len(text), + } + + except Exception as e: + logger.exception("Decision processing failed: %s", e) + await db.update_decision(decision_id, extraction_status="failed") + return {"status": "failed", "error": str(e)} diff --git a/din-leumi/src/din_leumi/tools/__init__.py b/din-leumi/src/din_leumi/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/din-leumi/src/din_leumi/tools/decisions.py b/din-leumi/src/din_leumi/tools/decisions.py new file mode 100644 index 0000000..a193da1 --- /dev/null +++ b/din-leumi/src/din_leumi/tools/decisions.py @@ -0,0 +1,241 @@ +"""Decision CRUD tool implementations.""" + +from __future__ import annotations + +import json +import shutil +from datetime import date +from pathlib import Path +from uuid import UUID + +from din_leumi import config +from din_leumi.services import db, processor + + +async def decision_upload( + file_path: str, + title: str = "", + court: str = "", + decision_date: str = "", + case_number: str = "", + judge: str = "", + parties_appellant: str = "", + parties_respondent: str = "המוסד לביטוח לאומי", + topics: list[str] | None = None, + outcome: str = "", +) -> str: + """Upload and process a court decision.""" + source = Path(file_path) + if not source.exists(): + return f"❌ הקובץ לא נמצא: {file_path}" + + ext = source.suffix.lower() + if ext not in (".pdf", ".docx", ".rtf", ".txt"): + return f"❌ סוג קובץ לא נתמך: {ext}" + + # Copy to decisions directory + config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True) + dest = config.DECISIONS_DIR / source.name + if dest.exists(): + # Add suffix to avoid overwrite + dest = config.DECISIONS_DIR / f"{source.stem}_{UUID.__class__.__name__[:8]}{ext}" + shutil.copy2(str(source), str(dest)) + + # Parse date + d_date = None + if decision_date: + try: + d_date = date.fromisoformat(decision_date) + except ValueError: + return f"❌ פורמט תאריך לא תקין: {decision_date}. נדרש YYYY-MM-DD" + + # Create decision record + if not title: + title = source.stem + + decision = await db.create_decision( + title=title, + file_path=str(dest), + court=court, + decision_date=d_date, + case_number=case_number, + judge=judge, + parties_appellant=parties_appellant, + parties_respondent=parties_respondent, + topics=topics, + outcome=outcome, + ) + + # Process (extract → chunk → embed → store) + result = await processor.process_decision(UUID(decision["id"])) + + status_icon = "✅" if result["status"] == "completed" else "❌" + lines = [ + f"{status_icon} פסק דין הועלה ועובד", + f" כותרת: {title}", + f" מזהה: {decision['id']}", + ] + if case_number: + lines.append(f" מספר תיק: {case_number}") + if result.get("chunks"): + lines.append(f" chunks: {result['chunks']}") + if result.get("pages"): + lines.append(f" עמודים: {result['pages']}") + if result.get("error"): + lines.append(f" שגיאה: {result['error']}") + + return "\n".join(lines) + + +async def decision_get( + decision_id: str = "", + case_number: str = "", +) -> str: + """Get full decision details.""" + if not decision_id and not case_number: + return "❌ נדרש decision_id או case_number" + + if decision_id: + decision = await db.get_decision(UUID(decision_id)) + else: + decision = await db.get_decision_by_case_number(case_number) + + if not decision: + return "❌ פסק דין לא נמצא" + + lines = [ + f"📄 {decision.get('title', '')}", + f" מזהה: {decision['id']}", + ] + if decision.get("case_number"): + lines.append(f" מספר תיק: {decision['case_number']}") + if decision.get("court"): + lines.append(f" בית משפט: {decision['court']}") + if decision.get("decision_date"): + lines.append(f" תאריך: {decision['decision_date']}") + if decision.get("judge"): + lines.append(f" שופט: {decision['judge']}") + if decision.get("parties_appellant"): + lines.append(f" תובע/מערער: {decision['parties_appellant']}") + if decision.get("parties_respondent"): + lines.append(f" נתבע/משיב: {decision['parties_respondent']}") + if decision.get("topics"): + topics = decision["topics"] + if isinstance(topics, str): + topics = json.loads(topics) + if topics: + lines.append(f" נושאים: {', '.join(topics)}") + if decision.get("outcome"): + lines.append(f" תוצאה: {decision['outcome']}") + if decision.get("summary"): + lines.append(f" תקציר: {decision['summary']}") + + lines.append(f" סטטוס: {decision.get('extraction_status', 'unknown')}") + if decision.get("page_count"): + lines.append(f" עמודים: {decision['page_count']}") + + # Include extracted text (truncated) + text = decision.get("extracted_text", "") + if text: + lines.append("") + lines.append("── טקסט מחולץ ──") + if len(text) > 15000: + lines.append(text[:15000]) + lines.append(f"\n... (נקטע, סה\"כ {len(text)} תווים)") + else: + lines.append(text) + + return "\n".join(lines) + + +async def decision_list( + court: str = "", + topic: str = "", + judge: str = "", + date_from: str = "", + date_to: str = "", + outcome: str = "", + limit: int = 50, +) -> str: + """List decisions with optional filters.""" + d_from = date.fromisoformat(date_from) if date_from else None + d_to = date.fromisoformat(date_to) if date_to else None + + decisions = await db.list_decisions( + court=court, topic=topic, judge=judge, + date_from=d_from, date_to=d_to, + outcome=outcome, limit=limit, + ) + + if not decisions: + return "לא נמצאו פסקי דין" + + lines = [f"נמצאו {len(decisions)} פסקי דין:\n"] + for d in decisions: + parts = [f"• {d.get('title', 'ללא כותרת')}"] + if d.get("case_number"): + parts.append(f" [{d['case_number']}]") + if d.get("court"): + parts.append(f" {d['court']}") + if d.get("decision_date"): + parts.append(f" {d['decision_date']}") + if d.get("outcome"): + parts.append(f" ({d['outcome']})") + lines.append(" ".join(parts)) + lines.append(f" מזהה: {d['id']}") + + return "\n".join(lines) + + +async def decision_update( + decision_id: str, + title: str = "", + court: str = "", + decision_date: str = "", + case_number: str = "", + judge: str = "", + parties_appellant: str = "", + parties_respondent: str = "", + topics: list[str] | None = None, + outcome: str = "", + summary: str = "", +) -> str: + """Update decision metadata.""" + fields = {} + if title: + fields["title"] = title + if court: + fields["court"] = court + if decision_date: + fields["decision_date"] = date.fromisoformat(decision_date) + if case_number: + fields["case_number"] = case_number + if judge: + fields["judge"] = judge + if parties_appellant: + fields["parties_appellant"] = parties_appellant + if parties_respondent: + fields["parties_respondent"] = parties_respondent + if topics is not None: + fields["topics"] = topics + if outcome: + fields["outcome"] = outcome + if summary: + fields["summary"] = summary + + if not fields: + return "❌ לא צוינו שדות לעדכון" + + result = await db.update_decision(UUID(decision_id), **fields) + if not result: + return "❌ פסק דין לא נמצא" + + return f"✅ פסק דין {decision_id} עודכן ({', '.join(fields.keys())})" + + +async def decision_delete(decision_id: str) -> str: + """Delete a decision and all its chunks.""" + deleted = await db.delete_decision(UUID(decision_id)) + if deleted: + return f"✅ פסק דין {decision_id} נמחק" + return "❌ פסק דין לא נמצא" diff --git a/din-leumi/src/din_leumi/tools/search.py b/din-leumi/src/din_leumi/tools/search.py new file mode 100644 index 0000000..65ed808 --- /dev/null +++ b/din-leumi/src/din_leumi/tools/search.py @@ -0,0 +1,97 @@ +"""Search tool implementations.""" + +from __future__ import annotations + +import json +from datetime import date + +from din_leumi.services import db, embeddings + + +async def decision_search( + query: str, + limit: int = 10, + court: str = "", + topic: str = "", + date_from: str = "", + date_to: str = "", + outcome: str = "", +) -> str: + """Semantic search across all decisions with optional metadata filters.""" + if not query.strip(): + return "❌ נדרש טקסט לחיפוש" + + # Embed query + query_emb = await embeddings.embed_query(query) + + d_from = date.fromisoformat(date_from) if date_from else None + d_to = date.fromisoformat(date_to) if date_to else None + + results = await db.search_similar( + query_embedding=query_emb, + limit=limit, + court=court, + topic=topic, + date_from=d_from, + date_to=d_to, + outcome=outcome, + ) + + if not results: + return "לא נמצאו תוצאות" + + lines = [f"🔍 נמצאו {len(results)} תוצאות עבור: \"{query}\"\n"] + + for i, r in enumerate(results, 1): + score = r.get("score", 0) + lines.append(f"── תוצאה {i} (ציון: {score:.3f}) ──") + lines.append(f" פסק דין: {r.get('title', '')}") + if r.get("case_number"): + lines.append(f" מספר תיק: {r['case_number']}") + if r.get("court"): + lines.append(f" בית משפט: {r['court']}") + if r.get("decision_date"): + lines.append(f" תאריך: {r['decision_date']}") + if r.get("judge"): + lines.append(f" שופט: {r['judge']}") + if r.get("outcome"): + lines.append(f" תוצאה: {r['outcome']}") + lines.append(f" סוג קטע: {r.get('section_type', 'other')}") + lines.append(f" מזהה: {r.get('decision_id', '')}") + lines.append("") + + # Show content snippet + content = r.get("content", "") + if len(content) > 500: + content = content[:500] + "..." + lines.append(f" {content}") + lines.append("") + + return "\n".join(lines) + + +async def system_status() -> str: + """Get system statistics.""" + stats = await db.get_stats() + + lines = [ + "📊 סטטוס מערכת דין לאומי", + "", + f" סה\"כ פסקי דין: {stats['total_decisions']}", + f" עובדו בהצלחה: {stats['completed_decisions']}", + f" סה\"כ chunks: {stats['total_chunks']}", + ] + + if stats.get("courts"): + lines.append("") + lines.append(" בתי משפט:") + for c in stats["courts"]: + lines.append(f" • {c['court']}: {c['count']}") + + if stats.get("date_range"): + dr = stats["date_range"] + if dr.get("earliest"): + lines.append("") + lines.append(f" טווח תאריכים: {dr['earliest']} — {dr['latest']}") + + return "\n".join(lines)