From cb41867bc94791c6e1d0eae608a77360b5a70db4 Mon Sep 17 00:00:00 2001 From: Chaim Date: Sat, 4 Apr 2026 08:34:35 +0000 Subject: [PATCH] Remove din-leumi: fully separate into standalone service - Removed din-leumi imports, endpoints, and processing from app.py - Removed bundled din-leumi source from repo - Simplified Dockerfile (no din-leumi dependency) - din-leumi now runs as its own Coolify application Co-Authored-By: Claude Opus 4.6 (1M context) --- Dockerfile | 10 +- din-leumi/pyproject.toml | 25 -- din-leumi/src/din_leumi/__init__.py | 0 din-leumi/src/din_leumi/__main__.py | 4 - din-leumi/src/din_leumi/config.py | 36 -- din-leumi/src/din_leumi/server.py | 156 -------- din-leumi/src/din_leumi/services/__init__.py | 0 din-leumi/src/din_leumi/services/chunker.py | 132 ------- din-leumi/src/din_leumi/services/db.py | 374 ------------------ .../src/din_leumi/services/embeddings.py | 55 --- din-leumi/src/din_leumi/services/extractor.py | 126 ------ din-leumi/src/din_leumi/services/processor.py | 82 ---- din-leumi/src/din_leumi/tools/__init__.py | 0 din-leumi/src/din_leumi/tools/decisions.py | 241 ----------- din-leumi/src/din_leumi/tools/search.py | 97 ----- web/app.py | 214 ---------- 16 files changed, 3 insertions(+), 1549 deletions(-) delete mode 100644 din-leumi/pyproject.toml delete mode 100644 din-leumi/src/din_leumi/__init__.py delete mode 100644 din-leumi/src/din_leumi/__main__.py delete mode 100644 din-leumi/src/din_leumi/config.py delete mode 100644 din-leumi/src/din_leumi/server.py delete mode 100644 din-leumi/src/din_leumi/services/__init__.py delete mode 100644 din-leumi/src/din_leumi/services/chunker.py delete mode 100644 din-leumi/src/din_leumi/services/db.py delete mode 100644 din-leumi/src/din_leumi/services/embeddings.py delete mode 100644 din-leumi/src/din_leumi/services/extractor.py delete mode 100644 din-leumi/src/din_leumi/services/processor.py delete mode 100644 din-leumi/src/din_leumi/tools/__init__.py delete mode 100644 din-leumi/src/din_leumi/tools/decisions.py delete mode 100644 din-leumi/src/din_leumi/tools/search.py diff --git a/Dockerfile b/Dockerfile index 401183d..e48ebde 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,19 +11,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ COPY mcp-server/pyproject.toml /app/mcp-server/pyproject.toml COPY mcp-server/src/ /app/mcp-server/src/ -# Copy Din Leumi MCP server source -COPY din-leumi/ /app/din-leumi/ - -# Install both MCP servers + web deps -RUN pip install --no-cache-dir /app/mcp-server /app/din-leumi && \ +# Install MCP server + web deps +RUN pip install --no-cache-dir /app/mcp-server && \ pip install --no-cache-dir fastapi uvicorn python-multipart # Copy web app COPY web/ /app/web/ -ENV PYTHONPATH=/app/mcp-server/src:/app/din-leumi/src +ENV PYTHONPATH=/app/mcp-server/src ENV DOTENV_PATH=/dev/null -ENV DOTENV_PATH=/home/chaim/.env EXPOSE 8080 diff --git a/din-leumi/pyproject.toml b/din-leumi/pyproject.toml deleted file mode 100644 index 22001ef..0000000 --- a/din-leumi/pyproject.toml +++ /dev/null @@ -1,25 +0,0 @@ -[project] -name = "din-leumi" -version = "0.1.0" -description = "MCP server for cataloging and searching National Insurance court decisions" -requires-python = ">=3.10" -dependencies = [ - "mcp[cli]>=1.0.0", - "asyncpg>=0.29.0", - "pgvector>=0.3.0", - "voyageai>=0.3.0", - "anthropic>=0.40.0", - "python-dotenv>=1.0.0", - "pydantic>=2.0.0", - "pymupdf>=1.25.0", - "python-docx>=1.1.0", - "striprtf>=0.0.26", - "pillow>=10.0.0", -] - -[build-system] -requires = ["setuptools>=68.0"] -build-backend = "setuptools.build_meta" - -[tool.setuptools.packages.find] -where = ["src"] diff --git a/din-leumi/src/din_leumi/__init__.py b/din-leumi/src/din_leumi/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/din-leumi/src/din_leumi/__main__.py b/din-leumi/src/din_leumi/__main__.py deleted file mode 100644 index 02ac546..0000000 --- a/din-leumi/src/din_leumi/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Allow running with: python -m din_leumi""" -from din_leumi.server import main - -main() diff --git a/din-leumi/src/din_leumi/config.py b/din-leumi/src/din_leumi/config.py deleted file mode 100644 index 20aa59d..0000000 --- a/din-leumi/src/din_leumi/config.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Configuration loaded from central .env file.""" - -import os -from pathlib import Path - -from dotenv import load_dotenv - -# Load from central .env or override path -dotenv_path = os.environ.get("DOTENV_PATH", str(Path.home() / ".env")) -load_dotenv(dotenv_path) - -# PostgreSQL - uses shared server but separate database -POSTGRES_URL = os.environ.get( - "DIN_LEUMI_POSTGRES_URL", - f"postgres://{os.environ.get('POSTGRES_USER', 'legal_ai')}:" - f"{os.environ.get('POSTGRES_PASSWORD', '')}@" - f"{os.environ.get('POSTGRES_HOST', '127.0.0.1')}:" - f"{os.environ.get('POSTGRES_PORT', '5433')}/" - f"{os.environ.get('DIN_LEUMI_POSTGRES_DB', 'din_leumi')}", -) - -# Voyage AI -VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY", "") -VOYAGE_MODEL = "voyage-3-large" -VOYAGE_DIMENSIONS = 1024 - -# Anthropic (for Claude Vision OCR) -ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") - -# Data directory -DATA_DIR = Path(os.environ.get("DIN_LEUMI_DATA_DIR", str(Path.home() / "din-leumi" / "data"))) -DECISIONS_DIR = DATA_DIR / "decisions" - -# Chunking parameters -CHUNK_SIZE_TOKENS = 600 -CHUNK_OVERLAP_TOKENS = 100 diff --git a/din-leumi/src/din_leumi/server.py b/din-leumi/src/din_leumi/server.py deleted file mode 100644 index a30281e..0000000 --- a/din-leumi/src/din_leumi/server.py +++ /dev/null @@ -1,156 +0,0 @@ -"""Din Leumi - MCP Server entry point. - -Run with: python -m din_leumi.server -""" - -from __future__ import annotations - -import logging -import sys -from collections.abc import AsyncIterator -from contextlib import asynccontextmanager - -from mcp.server.fastmcp import FastMCP - -# Configure logging to stderr (stdout is reserved for JSON-RPC) -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", - stream=sys.stderr, -) -logger = logging.getLogger("din_leumi") - - -@asynccontextmanager -async def lifespan(server: FastMCP) -> AsyncIterator[None]: - """Initialize DB schema on startup, close pool on shutdown.""" - from din_leumi.services.db import close_pool, init_schema - - logger.info("Initializing database schema...") - await init_schema() - logger.info("Din Leumi MCP server ready") - try: - yield - finally: - await close_pool() - logger.info("Din Leumi MCP server stopped") - - -# Create MCP server -mcp = FastMCP( - "Din Leumi - דין לאומי", - instructions="מערכת לקטלוג וחיפוש סמנטי של פסקי דין בתחום ביטוח לאומי", - lifespan=lifespan, -) - -# ── Import tool modules ──────────────────────────────────────────── - -from din_leumi.tools import decisions, search # noqa: E402 - - -# ── Decision management ──────────────────────────────────────────── - -@mcp.tool() -async def decision_upload( - file_path: str, - title: str = "", - court: str = "", - decision_date: str = "", - case_number: str = "", - judge: str = "", - parties_appellant: str = "", - parties_respondent: str = "המוסד לביטוח לאומי", - topics: list[str] | None = None, - outcome: str = "", -) -> str: - """העלאה ועיבוד פסק דין של בית דין לעבודה בתחום ביטוח לאומי (PDF/DOCX/RTF/TXT). - מחלץ טקסט, יוצר chunks ו-embeddings לחיפוש סמנטי. - outcome: accepted/rejected/partial/remanded.""" - return await decisions.decision_upload( - file_path, title, court, decision_date, case_number, - judge, parties_appellant, parties_respondent, topics, outcome, - ) - - -@mcp.tool() -async def decision_search( - query: str, - limit: int = 10, - court: str = "", - topic: str = "", - date_from: str = "", - date_to: str = "", - outcome: str = "", -) -> str: - """חיפוש סמנטי בפסקי דין של ביטוח לאומי. - ניתן לסנן לפי בית משפט, נושא, טווח תאריכים ותוצאה. - נושאים נפוצים: נכות כללית, נכות מעבודה, דמי לידה, תאונת עבודה, גמלת הבטחת הכנסה, דמי אבטלה.""" - return await search.decision_search( - query, limit, court, topic, date_from, date_to, outcome, - ) - - -@mcp.tool() -async def decision_list( - court: str = "", - topic: str = "", - judge: str = "", - date_from: str = "", - date_to: str = "", - outcome: str = "", - limit: int = 50, -) -> str: - """רשימת פסקי דין לפי מטאדאטא. סינון אופציונלי לפי בית משפט, נושא, שופט, תאריכים, תוצאה.""" - return await decisions.decision_list( - court, topic, judge, date_from, date_to, outcome, limit, - ) - - -@mcp.tool() -async def decision_get( - decision_id: str = "", - case_number: str = "", -) -> str: - """פרטי פסק דין מלאים כולל טקסט מחולץ. חיפוש לפי decision_id או case_number.""" - return await decisions.decision_get(decision_id, case_number) - - -@mcp.tool() -async def decision_update( - decision_id: str, - title: str = "", - court: str = "", - decision_date: str = "", - case_number: str = "", - judge: str = "", - parties_appellant: str = "", - parties_respondent: str = "", - topics: list[str] | None = None, - outcome: str = "", - summary: str = "", -) -> str: - """עדכון מטאדאטא של פסק דין.""" - return await decisions.decision_update( - decision_id, title, court, decision_date, case_number, - judge, parties_appellant, parties_respondent, topics, outcome, summary, - ) - - -@mcp.tool() -async def decision_delete(decision_id: str) -> str: - """מחיקת פסק דין וכל ה-chunks שלו מהמערכת.""" - return await decisions.decision_delete(decision_id) - - -@mcp.tool() -async def system_status() -> str: - """סטטוס מערכת דין לאומי - מספר פסקי דין, chunks, סטטיסטיקות.""" - return await search.system_status() - - -def main(): - mcp.run(transport="stdio") - - -if __name__ == "__main__": - main() diff --git a/din-leumi/src/din_leumi/services/__init__.py b/din-leumi/src/din_leumi/services/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/din-leumi/src/din_leumi/services/chunker.py b/din-leumi/src/din_leumi/services/chunker.py deleted file mode 100644 index 3fe7016..0000000 --- a/din-leumi/src/din_leumi/services/chunker.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Legal document chunker - splits text into sections and chunks for RAG. - -Adapted for National Insurance (Bituach Leumi) court decisions. -""" - -from __future__ import annotations - -import re -from dataclasses import dataclass - -from din_leumi import config - -# Hebrew legal section headers for NI court decisions -SECTION_PATTERNS = [ - (r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע|עובדות\s*המקרה", "facts"), - (r"טענות\s*התובע[ת]?|טענות\s*המבוטח[ת]?|טענות\s*המערער[ת]?|עיקר\s*טענות\s*התובע", "claimant_claims"), - (r"טענות\s*הנתבע[ת]?|טענות\s*המוסד|עיקר\s*טענות\s*הנתבע|תשובת\s*המוסד", "respondent_claims"), - (r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|הכרעת\s*הדין", "legal_analysis"), - (r"מסקנ[הות]|סיכום", "conclusion"), - (r"סוף\s*דבר|לסיכום|התוצאה|אשר\s*על\s*כן", "ruling"), - (r"מבוא|פתיחה|לפניי|לפני[נו]?", "intro"), - (r"הדין\s*החל|המסגרת\s*הנורמטיבית|הוראות\s*החוק", "legal_framework"), -] - - -@dataclass -class Chunk: - content: str - section_type: str = "other" - page_number: int | None = None - chunk_index: int = 0 - - -def chunk_document( - text: str, - chunk_size: int = config.CHUNK_SIZE_TOKENS, - overlap: int = config.CHUNK_OVERLAP_TOKENS, -) -> list[Chunk]: - """Split a legal document into chunks, respecting section boundaries.""" - if not text.strip(): - return [] - - sections = _split_into_sections(text) - chunks: list[Chunk] = [] - idx = 0 - - for section_type, section_text in sections: - section_chunks = _split_section(section_text, chunk_size, overlap) - for chunk_text in section_chunks: - chunks.append(Chunk( - content=chunk_text, - section_type=section_type, - chunk_index=idx, - )) - idx += 1 - - return chunks - - -def _split_into_sections(text: str) -> list[tuple[str, str]]: - """Split text into (section_type, text) pairs based on Hebrew headers.""" - markers: list[tuple[int, str]] = [] - - for pattern, section_type in SECTION_PATTERNS: - for match in re.finditer(pattern, text): - markers.append((match.start(), section_type)) - - if not markers: - return [("other", text)] - - markers.sort(key=lambda x: x[0]) - - sections: list[tuple[str, str]] = [] - - # Text before first section - if markers[0][0] > 0: - intro_text = text[: markers[0][0]].strip() - if intro_text: - sections.append(("intro", intro_text)) - - # Each section - for i, (pos, section_type) in enumerate(markers): - end = markers[i + 1][0] if i + 1 < len(markers) else len(text) - section_text = text[pos:end].strip() - if section_text: - sections.append((section_type, section_text)) - - return sections - - -def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]: - """Split a section into overlapping chunks by paragraphs. - - Uses approximate token counting (Hebrew ~1.5 chars per token). - """ - if not text.strip(): - return [] - - paragraphs = [p.strip() for p in text.split("\n") if p.strip()] - chunks: list[str] = [] - current: list[str] = [] - current_tokens = 0 - - for para in paragraphs: - para_tokens = _estimate_tokens(para) - - if current_tokens + para_tokens > chunk_size and current: - chunks.append("\n".join(current)) - # Keep overlap - overlap_paras: list[str] = [] - overlap_tokens = 0 - for p in reversed(current): - pt = _estimate_tokens(p) - if overlap_tokens + pt > overlap: - break - overlap_paras.insert(0, p) - overlap_tokens += pt - current = overlap_paras - current_tokens = overlap_tokens - - current.append(para) - current_tokens += para_tokens - - if current: - chunks.append("\n".join(current)) - - return chunks - - -def _estimate_tokens(text: str) -> int: - """Rough token estimate for Hebrew text (~1.5 chars per token).""" - return max(1, len(text) // 2) diff --git a/din-leumi/src/din_leumi/services/db.py b/din-leumi/src/din_leumi/services/db.py deleted file mode 100644 index ffdeac2..0000000 --- a/din-leumi/src/din_leumi/services/db.py +++ /dev/null @@ -1,374 +0,0 @@ -"""Database service - asyncpg connection pool and queries for din-leumi.""" - -from __future__ import annotations - -import json -import logging -from datetime import date -from uuid import UUID, uuid4 - -import asyncpg -from pgvector.asyncpg import register_vector - -from din_leumi import config - -logger = logging.getLogger(__name__) - -_pool: asyncpg.Pool | None = None - - -async def get_pool() -> asyncpg.Pool: - global _pool - if _pool is None: - conn = await asyncpg.connect(config.POSTGRES_URL) - await conn.execute('CREATE EXTENSION IF NOT EXISTS vector') - await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"') - await conn.close() - - _pool = await asyncpg.create_pool( - config.POSTGRES_URL, - min_size=2, - max_size=10, - init=_init_connection, - ) - return _pool - - -async def _init_connection(conn: asyncpg.Connection) -> None: - await register_vector(conn) - - -async def close_pool() -> None: - global _pool - if _pool: - await _pool.close() - _pool = None - - -# ── Schema ────────────────────────────────────────────────────────── - -SCHEMA_SQL = """ - -CREATE TABLE IF NOT EXISTS decisions ( - id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), - title TEXT NOT NULL, - file_path TEXT NOT NULL, - extracted_text TEXT DEFAULT '', - extraction_status TEXT DEFAULT 'pending', - - court TEXT DEFAULT '', - decision_date DATE, - case_number TEXT DEFAULT '', - judge TEXT DEFAULT '', - parties_appellant TEXT DEFAULT '', - parties_respondent TEXT DEFAULT 'המוסד לביטוח לאומי', - topics JSONB DEFAULT '[]', - outcome TEXT DEFAULT '', - summary TEXT DEFAULT '', - - page_count INTEGER, - created_at TIMESTAMPTZ DEFAULT now(), - updated_at TIMESTAMPTZ DEFAULT now() -); - -CREATE TABLE IF NOT EXISTS decision_chunks ( - id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), - decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, - chunk_index INTEGER NOT NULL, - content TEXT NOT NULL, - section_type TEXT DEFAULT 'other', - embedding vector(1024), - page_number INTEGER, - created_at TIMESTAMPTZ DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_chunks_decision ON decision_chunks(decision_id); -CREATE INDEX IF NOT EXISTS idx_decisions_court ON decisions(court); -CREATE INDEX IF NOT EXISTS idx_decisions_date ON decisions(decision_date); -CREATE INDEX IF NOT EXISTS idx_decisions_case_number ON decisions(case_number); -CREATE INDEX IF NOT EXISTS idx_decisions_topics ON decisions USING gin(topics); -""" - -# IVFFlat index requires data to exist; create after first batch of decisions -IVFFLAT_INDEX_SQL = """ -CREATE INDEX IF NOT EXISTS idx_chunks_embedding - ON decision_chunks USING ivfflat (embedding vector_cosine_ops) - WITH (lists = 100); -""" - - -async def init_schema() -> None: - pool = await get_pool() - async with pool.acquire() as conn: - await conn.execute(SCHEMA_SQL) - # Check if we have enough data for IVFFlat - count = await conn.fetchval("SELECT count(*) FROM decision_chunks") - if count and count > 100: - await conn.execute(IVFFLAT_INDEX_SQL) - logger.info("IVFFlat index created (%d chunks)", count) - logger.info("Database schema initialized") - - -async def ensure_ivfflat_index() -> None: - """Create IVFFlat index if enough data exists.""" - pool = await get_pool() - async with pool.acquire() as conn: - count = await conn.fetchval("SELECT count(*) FROM decision_chunks") - if count and count > 100: - await conn.execute(IVFFLAT_INDEX_SQL) - - -# ── Decision CRUD ─────────────────────────────────────────────────── - -async def create_decision( - title: str, - file_path: str, - court: str = "", - decision_date: date | None = None, - case_number: str = "", - judge: str = "", - parties_appellant: str = "", - parties_respondent: str = "המוסד לביטוח לאומי", - topics: list[str] | None = None, - outcome: str = "", -) -> dict: - pool = await get_pool() - decision_id = uuid4() - async with pool.acquire() as conn: - await conn.execute( - """INSERT INTO decisions (id, title, file_path, court, decision_date, - case_number, judge, parties_appellant, parties_respondent, - topics, outcome) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""", - decision_id, title, file_path, court, decision_date, - case_number, judge, parties_appellant, parties_respondent, - json.dumps(topics or []), outcome, - ) - return await get_decision(decision_id) - - -async def get_decision(decision_id: UUID) -> dict | None: - pool = await get_pool() - async with pool.acquire() as conn: - row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id) - if row is None: - return None - return _row_to_decision(row) - - -async def get_decision_by_case_number(case_number: str) -> dict | None: - pool = await get_pool() - async with pool.acquire() as conn: - row = await conn.fetchrow( - "SELECT * FROM decisions WHERE case_number = $1", case_number - ) - if row is None: - return None - return _row_to_decision(row) - - -async def list_decisions( - court: str = "", - topic: str = "", - judge: str = "", - date_from: date | None = None, - date_to: date | None = None, - outcome: str = "", - limit: int = 50, -) -> list[dict]: - pool = await get_pool() - conditions = [] - params: list = [] - idx = 1 - - if court: - conditions.append(f"court ILIKE ${idx}") - params.append(f"%{court}%") - idx += 1 - if topic: - conditions.append(f"topics @> ${idx}::jsonb") - params.append(json.dumps([topic])) - idx += 1 - if judge: - conditions.append(f"judge ILIKE ${idx}") - params.append(f"%{judge}%") - idx += 1 - if date_from: - conditions.append(f"decision_date >= ${idx}") - params.append(date_from) - idx += 1 - if date_to: - conditions.append(f"decision_date <= ${idx}") - params.append(date_to) - idx += 1 - if outcome: - conditions.append(f"outcome = ${idx}") - params.append(outcome) - idx += 1 - - where = f"WHERE {' AND '.join(conditions)}" if conditions else "" - params.append(limit) - - sql = f""" - SELECT id, title, court, decision_date, case_number, judge, - parties_appellant, parties_respondent, topics, outcome, summary, - extraction_status, page_count, created_at - FROM decisions - {where} - ORDER BY decision_date DESC NULLS LAST, created_at DESC - LIMIT ${idx} - """ - async with pool.acquire() as conn: - rows = await conn.fetch(sql, *params) - return [_row_to_decision(r) for r in rows] - - -async def update_decision(decision_id: UUID, **fields) -> dict | None: - if not fields: - return await get_decision(decision_id) - pool = await get_pool() - set_clauses = [] - values = [] - for i, (key, val) in enumerate(fields.items(), start=2): - if key == "topics": - val = json.dumps(val) - set_clauses.append(f"{key} = ${i}") - values.append(val) - set_clauses.append("updated_at = now()") - sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1" - async with pool.acquire() as conn: - await conn.execute(sql, decision_id, *values) - return await get_decision(decision_id) - - -async def delete_decision(decision_id: UUID) -> bool: - pool = await get_pool() - async with pool.acquire() as conn: - result = await conn.execute("DELETE FROM decisions WHERE id = $1", decision_id) - return result == "DELETE 1" - - -def _row_to_decision(row: asyncpg.Record) -> dict: - d = dict(row) - if isinstance(d.get("topics"), str): - d["topics"] = json.loads(d["topics"]) - for field in ("id",): - if field in d and d[field] is not None: - d[field] = str(d[field]) - return d - - -# ── Chunks & Vectors ─────────────────────────────────────────────── - -async def store_chunks( - decision_id: UUID, - chunks: list[dict], -) -> int: - """Store decision chunks with embeddings.""" - pool = await get_pool() - async with pool.acquire() as conn: - await conn.execute( - "DELETE FROM decision_chunks WHERE decision_id = $1", decision_id - ) - for chunk in chunks: - await conn.execute( - """INSERT INTO decision_chunks - (decision_id, chunk_index, content, section_type, embedding, page_number) - VALUES ($1, $2, $3, $4, $5, $6)""", - decision_id, - chunk["chunk_index"], - chunk["content"], - chunk.get("section_type", "other"), - chunk["embedding"], - chunk.get("page_number"), - ) - return len(chunks) - - -async def search_similar( - query_embedding: list[float], - limit: int = 10, - court: str = "", - topic: str = "", - date_from: date | None = None, - date_to: date | None = None, - outcome: str = "", -) -> list[dict]: - """Cosine similarity search on decision chunks with metadata filtering.""" - pool = await get_pool() - conditions = [] - params: list = [query_embedding, limit] - idx = 3 - - if court: - conditions.append(f"d.court ILIKE ${idx}") - params.append(f"%{court}%") - idx += 1 - if topic: - conditions.append(f"d.topics @> ${idx}::jsonb") - params.append(json.dumps([topic])) - idx += 1 - if date_from: - conditions.append(f"d.decision_date >= ${idx}") - params.append(date_from) - idx += 1 - if date_to: - conditions.append(f"d.decision_date <= ${idx}") - params.append(date_to) - idx += 1 - if outcome: - conditions.append(f"d.outcome = ${idx}") - params.append(outcome) - idx += 1 - - where = f"WHERE {' AND '.join(conditions)}" if conditions else "" - - sql = f""" - SELECT dc.content, dc.section_type, dc.page_number, - dc.decision_id, - d.title, d.case_number, d.court, d.decision_date, - d.judge, d.outcome, - 1 - (dc.embedding <=> $1) AS score - FROM decision_chunks dc - JOIN decisions d ON d.id = dc.decision_id - {where} - ORDER BY dc.embedding <=> $1 - LIMIT $2 - """ - async with pool.acquire() as conn: - rows = await conn.fetch(sql, *params) - results = [] - for r in rows: - d = dict(r) - if d.get("decision_id"): - d["decision_id"] = str(d["decision_id"]) - results.append(d) - return results - - -# ── Stats ────────────────────────────────────────────────────────── - -async def get_stats() -> dict: - pool = await get_pool() - async with pool.acquire() as conn: - total_decisions = await conn.fetchval("SELECT count(*) FROM decisions") - total_chunks = await conn.fetchval("SELECT count(*) FROM decision_chunks") - completed = await conn.fetchval( - "SELECT count(*) FROM decisions WHERE extraction_status = 'completed'" - ) - courts = await conn.fetch( - "SELECT court, count(*) as cnt FROM decisions WHERE court != '' GROUP BY court ORDER BY cnt DESC LIMIT 10" - ) - date_range = await conn.fetchrow( - "SELECT min(decision_date) as earliest, max(decision_date) as latest FROM decisions WHERE decision_date IS NOT NULL" - ) - return { - "total_decisions": total_decisions, - "completed_decisions": completed, - "total_chunks": total_chunks, - "courts": [{"court": r["court"], "count": r["cnt"]} for r in courts], - "date_range": { - "earliest": str(date_range["earliest"]) if date_range and date_range["earliest"] else None, - "latest": str(date_range["latest"]) if date_range and date_range["latest"] else None, - } if date_range else None, - } diff --git a/din-leumi/src/din_leumi/services/embeddings.py b/din-leumi/src/din_leumi/services/embeddings.py deleted file mode 100644 index 43e6969..0000000 --- a/din-leumi/src/din_leumi/services/embeddings.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Embedding service using Voyage AI API.""" - -from __future__ import annotations - -import logging - -import voyageai - -from din_leumi import config - -logger = logging.getLogger(__name__) - -_client: voyageai.Client | None = None - - -def _get_client() -> voyageai.Client: - global _client - if _client is None: - _client = voyageai.Client(api_key=config.VOYAGE_API_KEY) - return _client - - -async def embed_texts(texts: list[str], input_type: str = "document") -> list[list[float]]: - """Embed a batch of texts using Voyage AI. - - Args: - texts: List of texts to embed (max 128 per call). - input_type: "document" for indexing, "query" for search queries. - - Returns: - List of embedding vectors (1024 dimensions each). - """ - if not texts: - return [] - - client = _get_client() - all_embeddings = [] - - # Voyage AI supports up to 128 texts per batch - for i in range(0, len(texts), 128): - batch = texts[i : i + 128] - result = client.embed( - batch, - model=config.VOYAGE_MODEL, - input_type=input_type, - ) - all_embeddings.extend(result.embeddings) - - return all_embeddings - - -async def embed_query(query: str) -> list[float]: - """Embed a single search query.""" - results = await embed_texts([query], input_type="query") - return results[0] diff --git a/din-leumi/src/din_leumi/services/extractor.py b/din-leumi/src/din_leumi/services/extractor.py deleted file mode 100644 index d5525ee..0000000 --- a/din-leumi/src/din_leumi/services/extractor.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Text extraction from PDF, DOCX, and RTF files. - -Primary PDF extraction: Claude Vision API (for scanned documents). -Fallback: PyMuPDF direct text extraction (for born-digital PDFs). -""" - -from __future__ import annotations - -import base64 -import logging -from pathlib import Path - -import anthropic -import fitz # PyMuPDF -from docx import Document as DocxDocument -from striprtf.striprtf import rtf_to_text - -from din_leumi import config - -logger = logging.getLogger(__name__) - -_anthropic_client: anthropic.Anthropic | None = None - - -def _get_anthropic() -> anthropic.Anthropic: - global _anthropic_client - if _anthropic_client is None: - _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) - return _anthropic_client - - -async def extract_text(file_path: str) -> tuple[str, int]: - """Extract text from a document file. - - Returns: - Tuple of (extracted_text, page_count). - page_count is 0 for non-PDF files. - """ - path = Path(file_path) - suffix = path.suffix.lower() - - if suffix == ".pdf": - return await _extract_pdf(path) - elif suffix == ".docx": - return _extract_docx(path), 0 - elif suffix == ".rtf": - return _extract_rtf(path), 0 - elif suffix == ".txt": - return path.read_text(encoding="utf-8"), 0 - else: - raise ValueError(f"Unsupported file type: {suffix}") - - -async def _extract_pdf(path: Path) -> tuple[str, int]: - """Extract text from PDF. Try direct text first, fall back to Claude Vision for scanned pages.""" - doc = fitz.open(str(path)) - page_count = len(doc) - pages_text: list[str] = [] - - for page_num in range(page_count): - page = doc[page_num] - # Try direct text extraction first - text = page.get_text().strip() - - if len(text) > 50: - # Sufficient text found - born-digital page - pages_text.append(text) - logger.debug("Page %d: direct text extraction (%d chars)", page_num + 1, len(text)) - else: - # Likely scanned - use Claude Vision - logger.info("Page %d: using Claude Vision OCR", page_num + 1) - pix = page.get_pixmap(dpi=200) - img_bytes = pix.tobytes("png") - ocr_text = await _ocr_with_claude(img_bytes, page_num + 1) - pages_text.append(ocr_text) - - doc.close() - return "\n\n".join(pages_text), page_count - - -async def _ocr_with_claude(image_bytes: bytes, page_num: int) -> str: - """OCR a single page image using Claude Vision API.""" - client = _get_anthropic() - b64_image = base64.b64encode(image_bytes).decode("utf-8") - - message = client.messages.create( - model="claude-sonnet-4-20250514", - max_tokens=4096, - messages=[ - { - "role": "user", - "content": [ - { - "type": "image", - "source": { - "type": "base64", - "media_type": "image/png", - "data": b64_image, - }, - }, - { - "type": "text", - "text": ( - "חלץ את כל הטקסט מהתמונה הזו. זהו מסמך משפטי בעברית. " - "שמור על מבנה הפסקאות המקורי. " - "החזר רק את הטקסט המחולץ, ללא הערות נוספות." - ), - }, - ], - } - ], - ) - return message.content[0].text - - -def _extract_docx(path: Path) -> str: - """Extract text from DOCX file.""" - doc = DocxDocument(str(path)) - paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] - return "\n\n".join(paragraphs) - - -def _extract_rtf(path: Path) -> str: - """Extract text from RTF file.""" - rtf_content = path.read_text(encoding="utf-8", errors="replace") - return rtf_to_text(rtf_content) diff --git a/din-leumi/src/din_leumi/services/processor.py b/din-leumi/src/din_leumi/services/processor.py deleted file mode 100644 index 829af69..0000000 --- a/din-leumi/src/din_leumi/services/processor.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Decision processing pipeline: extract → chunk → embed → store.""" - -from __future__ import annotations - -import logging -from uuid import UUID - -from din_leumi.services import chunker, db, embeddings, extractor - -logger = logging.getLogger(__name__) - - -async def process_decision(decision_id: UUID) -> dict: - """Full processing pipeline for a decision. - - 1. Extract text from file - 2. Split into chunks - 3. Generate embeddings - 4. Store chunks + embeddings in DB - - Returns processing summary. - """ - decision = await db.get_decision(decision_id) - if not decision: - raise ValueError(f"Decision {decision_id} not found") - - await db.update_decision(decision_id, extraction_status="processing") - - try: - # Step 1: Extract text - logger.info("Extracting text from %s", decision["file_path"]) - text, page_count = await extractor.extract_text(decision["file_path"]) - - await db.update_decision( - decision_id, - extracted_text=text, - page_count=page_count, - ) - - # Step 2: Chunk - logger.info("Chunking decision (%d chars)", len(text)) - chunks = chunker.chunk_document(text) - - if not chunks: - await db.update_decision(decision_id, extraction_status="completed") - return {"status": "completed", "chunks": 0, "message": "No text to chunk"} - - # Step 3: Embed - logger.info("Generating embeddings for %d chunks", len(chunks)) - texts = [c.content for c in chunks] - embs = await embeddings.embed_texts(texts, input_type="document") - - # Step 4: Store - chunk_dicts = [ - { - "content": c.content, - "section_type": c.section_type, - "embedding": emb, - "page_number": c.page_number, - "chunk_index": c.chunk_index, - } - for c, emb in zip(chunks, embs) - ] - - stored = await db.store_chunks(decision_id, chunk_dicts) - await db.update_decision(decision_id, extraction_status="completed") - - # Try to create IVFFlat index if we have enough data - await db.ensure_ivfflat_index() - - logger.info("Decision processed: %d chunks stored", stored) - return { - "status": "completed", - "chunks": stored, - "pages": page_count, - "text_length": len(text), - } - - except Exception as e: - logger.exception("Decision processing failed: %s", e) - await db.update_decision(decision_id, extraction_status="failed") - return {"status": "failed", "error": str(e)} diff --git a/din-leumi/src/din_leumi/tools/__init__.py b/din-leumi/src/din_leumi/tools/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/din-leumi/src/din_leumi/tools/decisions.py b/din-leumi/src/din_leumi/tools/decisions.py deleted file mode 100644 index a193da1..0000000 --- a/din-leumi/src/din_leumi/tools/decisions.py +++ /dev/null @@ -1,241 +0,0 @@ -"""Decision CRUD tool implementations.""" - -from __future__ import annotations - -import json -import shutil -from datetime import date -from pathlib import Path -from uuid import UUID - -from din_leumi import config -from din_leumi.services import db, processor - - -async def decision_upload( - file_path: str, - title: str = "", - court: str = "", - decision_date: str = "", - case_number: str = "", - judge: str = "", - parties_appellant: str = "", - parties_respondent: str = "המוסד לביטוח לאומי", - topics: list[str] | None = None, - outcome: str = "", -) -> str: - """Upload and process a court decision.""" - source = Path(file_path) - if not source.exists(): - return f"❌ הקובץ לא נמצא: {file_path}" - - ext = source.suffix.lower() - if ext not in (".pdf", ".docx", ".rtf", ".txt"): - return f"❌ סוג קובץ לא נתמך: {ext}" - - # Copy to decisions directory - config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True) - dest = config.DECISIONS_DIR / source.name - if dest.exists(): - # Add suffix to avoid overwrite - dest = config.DECISIONS_DIR / f"{source.stem}_{UUID.__class__.__name__[:8]}{ext}" - shutil.copy2(str(source), str(dest)) - - # Parse date - d_date = None - if decision_date: - try: - d_date = date.fromisoformat(decision_date) - except ValueError: - return f"❌ פורמט תאריך לא תקין: {decision_date}. נדרש YYYY-MM-DD" - - # Create decision record - if not title: - title = source.stem - - decision = await db.create_decision( - title=title, - file_path=str(dest), - court=court, - decision_date=d_date, - case_number=case_number, - judge=judge, - parties_appellant=parties_appellant, - parties_respondent=parties_respondent, - topics=topics, - outcome=outcome, - ) - - # Process (extract → chunk → embed → store) - result = await processor.process_decision(UUID(decision["id"])) - - status_icon = "✅" if result["status"] == "completed" else "❌" - lines = [ - f"{status_icon} פסק דין הועלה ועובד", - f" כותרת: {title}", - f" מזהה: {decision['id']}", - ] - if case_number: - lines.append(f" מספר תיק: {case_number}") - if result.get("chunks"): - lines.append(f" chunks: {result['chunks']}") - if result.get("pages"): - lines.append(f" עמודים: {result['pages']}") - if result.get("error"): - lines.append(f" שגיאה: {result['error']}") - - return "\n".join(lines) - - -async def decision_get( - decision_id: str = "", - case_number: str = "", -) -> str: - """Get full decision details.""" - if not decision_id and not case_number: - return "❌ נדרש decision_id או case_number" - - if decision_id: - decision = await db.get_decision(UUID(decision_id)) - else: - decision = await db.get_decision_by_case_number(case_number) - - if not decision: - return "❌ פסק דין לא נמצא" - - lines = [ - f"📄 {decision.get('title', '')}", - f" מזהה: {decision['id']}", - ] - if decision.get("case_number"): - lines.append(f" מספר תיק: {decision['case_number']}") - if decision.get("court"): - lines.append(f" בית משפט: {decision['court']}") - if decision.get("decision_date"): - lines.append(f" תאריך: {decision['decision_date']}") - if decision.get("judge"): - lines.append(f" שופט: {decision['judge']}") - if decision.get("parties_appellant"): - lines.append(f" תובע/מערער: {decision['parties_appellant']}") - if decision.get("parties_respondent"): - lines.append(f" נתבע/משיב: {decision['parties_respondent']}") - if decision.get("topics"): - topics = decision["topics"] - if isinstance(topics, str): - topics = json.loads(topics) - if topics: - lines.append(f" נושאים: {', '.join(topics)}") - if decision.get("outcome"): - lines.append(f" תוצאה: {decision['outcome']}") - if decision.get("summary"): - lines.append(f" תקציר: {decision['summary']}") - - lines.append(f" סטטוס: {decision.get('extraction_status', 'unknown')}") - if decision.get("page_count"): - lines.append(f" עמודים: {decision['page_count']}") - - # Include extracted text (truncated) - text = decision.get("extracted_text", "") - if text: - lines.append("") - lines.append("── טקסט מחולץ ──") - if len(text) > 15000: - lines.append(text[:15000]) - lines.append(f"\n... (נקטע, סה\"כ {len(text)} תווים)") - else: - lines.append(text) - - return "\n".join(lines) - - -async def decision_list( - court: str = "", - topic: str = "", - judge: str = "", - date_from: str = "", - date_to: str = "", - outcome: str = "", - limit: int = 50, -) -> str: - """List decisions with optional filters.""" - d_from = date.fromisoformat(date_from) if date_from else None - d_to = date.fromisoformat(date_to) if date_to else None - - decisions = await db.list_decisions( - court=court, topic=topic, judge=judge, - date_from=d_from, date_to=d_to, - outcome=outcome, limit=limit, - ) - - if not decisions: - return "לא נמצאו פסקי דין" - - lines = [f"נמצאו {len(decisions)} פסקי דין:\n"] - for d in decisions: - parts = [f"• {d.get('title', 'ללא כותרת')}"] - if d.get("case_number"): - parts.append(f" [{d['case_number']}]") - if d.get("court"): - parts.append(f" {d['court']}") - if d.get("decision_date"): - parts.append(f" {d['decision_date']}") - if d.get("outcome"): - parts.append(f" ({d['outcome']})") - lines.append(" ".join(parts)) - lines.append(f" מזהה: {d['id']}") - - return "\n".join(lines) - - -async def decision_update( - decision_id: str, - title: str = "", - court: str = "", - decision_date: str = "", - case_number: str = "", - judge: str = "", - parties_appellant: str = "", - parties_respondent: str = "", - topics: list[str] | None = None, - outcome: str = "", - summary: str = "", -) -> str: - """Update decision metadata.""" - fields = {} - if title: - fields["title"] = title - if court: - fields["court"] = court - if decision_date: - fields["decision_date"] = date.fromisoformat(decision_date) - if case_number: - fields["case_number"] = case_number - if judge: - fields["judge"] = judge - if parties_appellant: - fields["parties_appellant"] = parties_appellant - if parties_respondent: - fields["parties_respondent"] = parties_respondent - if topics is not None: - fields["topics"] = topics - if outcome: - fields["outcome"] = outcome - if summary: - fields["summary"] = summary - - if not fields: - return "❌ לא צוינו שדות לעדכון" - - result = await db.update_decision(UUID(decision_id), **fields) - if not result: - return "❌ פסק דין לא נמצא" - - return f"✅ פסק דין {decision_id} עודכן ({', '.join(fields.keys())})" - - -async def decision_delete(decision_id: str) -> str: - """Delete a decision and all its chunks.""" - deleted = await db.delete_decision(UUID(decision_id)) - if deleted: - return f"✅ פסק דין {decision_id} נמחק" - return "❌ פסק דין לא נמצא" diff --git a/din-leumi/src/din_leumi/tools/search.py b/din-leumi/src/din_leumi/tools/search.py deleted file mode 100644 index 65ed808..0000000 --- a/din-leumi/src/din_leumi/tools/search.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Search tool implementations.""" - -from __future__ import annotations - -import json -from datetime import date - -from din_leumi.services import db, embeddings - - -async def decision_search( - query: str, - limit: int = 10, - court: str = "", - topic: str = "", - date_from: str = "", - date_to: str = "", - outcome: str = "", -) -> str: - """Semantic search across all decisions with optional metadata filters.""" - if not query.strip(): - return "❌ נדרש טקסט לחיפוש" - - # Embed query - query_emb = await embeddings.embed_query(query) - - d_from = date.fromisoformat(date_from) if date_from else None - d_to = date.fromisoformat(date_to) if date_to else None - - results = await db.search_similar( - query_embedding=query_emb, - limit=limit, - court=court, - topic=topic, - date_from=d_from, - date_to=d_to, - outcome=outcome, - ) - - if not results: - return "לא נמצאו תוצאות" - - lines = [f"🔍 נמצאו {len(results)} תוצאות עבור: \"{query}\"\n"] - - for i, r in enumerate(results, 1): - score = r.get("score", 0) - lines.append(f"── תוצאה {i} (ציון: {score:.3f}) ──") - lines.append(f" פסק דין: {r.get('title', '')}") - if r.get("case_number"): - lines.append(f" מספר תיק: {r['case_number']}") - if r.get("court"): - lines.append(f" בית משפט: {r['court']}") - if r.get("decision_date"): - lines.append(f" תאריך: {r['decision_date']}") - if r.get("judge"): - lines.append(f" שופט: {r['judge']}") - if r.get("outcome"): - lines.append(f" תוצאה: {r['outcome']}") - lines.append(f" סוג קטע: {r.get('section_type', 'other')}") - lines.append(f" מזהה: {r.get('decision_id', '')}") - lines.append("") - - # Show content snippet - content = r.get("content", "") - if len(content) > 500: - content = content[:500] + "..." - lines.append(f" {content}") - lines.append("") - - return "\n".join(lines) - - -async def system_status() -> str: - """Get system statistics.""" - stats = await db.get_stats() - - lines = [ - "📊 סטטוס מערכת דין לאומי", - "", - f" סה\"כ פסקי דין: {stats['total_decisions']}", - f" עובדו בהצלחה: {stats['completed_decisions']}", - f" סה\"כ chunks: {stats['total_chunks']}", - ] - - if stats.get("courts"): - lines.append("") - lines.append(" בתי משפט:") - for c in stats["courts"]: - lines.append(f" • {c['court']}: {c['count']}") - - if stats.get("date_range"): - dr = stats["date_range"] - if dr.get("earliest"): - lines.append("") - lines.append(f" טווח תאריכים: {dr['earliest']} — {dr['latest']}") - - return "\n".join(lines) diff --git a/web/app.py b/web/app.py index de13b1e..80bb096 100644 --- a/web/app.py +++ b/web/app.py @@ -16,8 +16,6 @@ from uuid import UUID, uuid4 # Allow importing legal_mcp from the MCP server source sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src")) -# Allow importing din_leumi from its MCP server source -sys.path.insert(0, str(Path.home() / "din-leumi" / "mcp-server" / "src")) from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.responses import FileResponse, StreamingResponse @@ -28,13 +26,6 @@ from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor, processor from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools -# Din Leumi imports (aliased to avoid collision) -from din_leumi import config as dl_config -from din_leumi.services import db as dl_db -from din_leumi.services import processor as dl_processor -from din_leumi.services import extractor as dl_extractor - -import anthropic logger = logging.getLogger(__name__) @@ -49,12 +40,9 @@ _progress: dict[str, dict] = {} @asynccontextmanager async def lifespan(app: FastAPI): UPLOAD_DIR.mkdir(parents=True, exist_ok=True) - dl_config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True) await db.init_schema() - await dl_db.init_schema() yield await db.close_pool() - await dl_db.close_pool() app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan) @@ -558,94 +546,6 @@ async def api_document_text(doc_id: str): return {"doc_id": doc_id, "text": text} -# ── Din Leumi Endpoint ──────────────────────────────────────────── - - -class DinLeumiRequest(BaseModel): - filename: str - title: str = "" - - -@app.post("/api/classify-dinleumi") -async def classify_dinleumi(req: DinLeumiRequest): - """Upload a decision to Din Leumi with auto metadata extraction.""" - source = UPLOAD_DIR / req.filename - if not source.exists() or not source.parent.samefile(UPLOAD_DIR): - raise HTTPException(404, "File not found in uploads") - - task_id = str(uuid4()) - _progress[task_id] = {"status": "queued", "filename": req.filename} - - asyncio.create_task(_process_dinleumi_decision(task_id, source, req)) - - return {"task_id": task_id} - - -# ── Metadata Extraction ────────────────────────────────────────── - -METADATA_EXTRACTION_PROMPT = """אתה מנתח פסקי דין של בתי דין לעבודה בתחום ביטוח לאומי. -חלץ את המטאדאטא הבאה מתוך פסק הדין והחזר אותה כ-JSON בלבד: - -{ - "title": "כותרת תיאורית קצרה של פסק הדין", - "court": "שם בית המשפט (למשל: בית הדין האזורי לעבודה תל אביב)", - "decision_date": "YYYY-MM-DD או null אם לא נמצא", - "case_number": "מספר תיק (למשל: בל 12345-06-20)", - "judge": "שם השופט/ת", - "parties_appellant": "שם התובע/מערער", - "parties_respondent": "שם הנתבע/משיב", - "topics": ["רשימת נושאים רלוונטיים מתוך הרשימה למטה"], - "outcome": "accepted/rejected/partial/remanded", - "summary": "תקציר של 2-3 משפטים" -} - -נושאים אפשריים: נכות כללית, נכות מעבודה, תאונת עבודה, דמי לידה, דמי אבטלה, גמלת הבטחת הכנסה, גמלת ניידות, גמלת סיעוד, קצבת זקנה, קצבת שאירים, מילואים, דמי פגיעה, נפגעי פעולות איבה - -החזר JSON בלבד, ללא טקסט נוסף.""" - - -_anthropic_client: anthropic.Anthropic | None = None - - -def _get_anthropic() -> anthropic.Anthropic: - global _anthropic_client - if _anthropic_client is None: - _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) - return _anthropic_client - - -async def _extract_metadata_with_claude(text: str) -> dict: - """Extract metadata from decision text using Claude.""" - client = _get_anthropic() - # Use first ~5000 chars (usually contains all metadata) - excerpt = text[:5000] - - message = client.messages.create( - model="claude-sonnet-4-20250514", - max_tokens=1024, - messages=[ - { - "role": "user", - "content": f"{METADATA_EXTRACTION_PROMPT}\n\nפסק הדין:\n{excerpt}", - } - ], - ) - - response_text = message.content[0].text.strip() - # Parse JSON from response (handle potential markdown wrapping) - if response_text.startswith("```"): - response_text = response_text.split("```")[1] - if response_text.startswith("json"): - response_text = response_text[4:] - try: - metadata = json.loads(response_text) - except json.JSONDecodeError: - logger.warning("Failed to parse metadata JSON: %s", response_text[:200]) - metadata = {} - - return metadata - - # ── Background Processing ───────────────────────────────────────── @@ -802,117 +702,3 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe "chunks": chunk_count, }, } - - -async def _process_dinleumi_decision(task_id: str, source: Path, req: DinLeumiRequest): - """Process a National Insurance court decision with auto metadata extraction.""" - from datetime import date as date_type - - try: - # Step 1: Copy to din-leumi decisions directory - _progress[task_id] = {"status": "copying", "filename": req.filename} - original_name = re.sub(r"^\d+_", "", source.name) - dest = dl_config.DECISIONS_DIR / original_name - if dest.exists(): - dest = dl_config.DECISIONS_DIR / f"{dest.stem}_{int(time.time())}{dest.suffix}" - shutil.copy2(str(source), str(dest)) - - # Step 2: Extract text - _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"} - text, page_count = await dl_extractor.extract_text(str(dest)) - - # Step 3: Extract metadata with Claude - _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting_metadata"} - metadata = await _extract_metadata_with_claude(text) - - # Parse date - d_date = None - if metadata.get("decision_date"): - try: - d_date = date_type.fromisoformat(metadata["decision_date"]) - except (ValueError, TypeError): - d_date = None - - title = req.title or metadata.get("title", original_name.rsplit(".", 1)[0]) - - # Step 4: Create decision record - _progress[task_id] = {"status": "registering", "filename": req.filename} - decision = await dl_db.create_decision( - title=title, - file_path=str(dest), - court=metadata.get("court", ""), - decision_date=d_date, - case_number=metadata.get("case_number", ""), - judge=metadata.get("judge", ""), - parties_appellant=metadata.get("parties_appellant", ""), - parties_respondent=metadata.get("parties_respondent", "המוסד לביטוח לאומי"), - topics=metadata.get("topics"), - outcome=metadata.get("outcome", ""), - ) - - decision_id = UUID(decision["id"]) - - # Update with extracted text - await dl_db.update_decision( - decision_id, - extracted_text=text, - page_count=page_count, - summary=metadata.get("summary", ""), - ) - - # Step 5: Chunk - _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"} - from din_leumi.services import chunker as dl_chunker, embeddings as dl_embeddings - chunks = dl_chunker.chunk_document(text) - - chunk_count = 0 - if chunks: - # Step 6: Embed - _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"} - texts = [c.content for c in chunks] - embs = await dl_embeddings.embed_texts(texts, input_type="document") - - chunk_dicts = [ - { - "content": c.content, - "section_type": c.section_type, - "embedding": emb, - "page_number": c.page_number, - "chunk_index": c.chunk_index, - } - for c, emb in zip(chunks, embs) - ] - await dl_db.store_chunks(decision_id, chunk_dicts) - chunk_count = len(chunks) - - await dl_db.update_decision(decision_id, extraction_status="completed") - await dl_db.ensure_ivfflat_index() - - # Remove from uploads - source.unlink(missing_ok=True) - - _progress[task_id] = { - "status": "completed", - "filename": req.filename, - "system": "din-leumi", - "result": { - "decision_id": str(decision_id), - "title": title, - "pages": page_count, - "text_length": len(text), - "chunks": chunk_count, - }, - "metadata": { - "court": metadata.get("court", ""), - "judge": metadata.get("judge", ""), - "case_number": metadata.get("case_number", ""), - "decision_date": metadata.get("decision_date", ""), - "outcome": metadata.get("outcome", ""), - "topics": metadata.get("topics", []), - "summary": metadata.get("summary", ""), - }, - } - - except Exception as e: - logger.exception("Din Leumi processing failed for %s", req.filename) - _progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}