Initial commit: din-leumi MCP server + web app
MCP server with 7 tools for cataloging and searching National Insurance court decisions with pgvector semantic search. Web interface for upload, search, and browse.
This commit is contained in:
0
mcp-server/src/din_leumi/services/__init__.py
Normal file
0
mcp-server/src/din_leumi/services/__init__.py
Normal file
132
mcp-server/src/din_leumi/services/chunker.py
Normal file
132
mcp-server/src/din_leumi/services/chunker.py
Normal file
@@ -0,0 +1,132 @@
|
||||
"""Legal document chunker - splits text into sections and chunks for RAG.
|
||||
|
||||
Adapted for National Insurance (Bituach Leumi) court decisions.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
from din_leumi import config
|
||||
|
||||
# Hebrew legal section headers for NI court decisions
|
||||
SECTION_PATTERNS = [
|
||||
(r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע|עובדות\s*המקרה", "facts"),
|
||||
(r"טענות\s*התובע[ת]?|טענות\s*המבוטח[ת]?|טענות\s*המערער[ת]?|עיקר\s*טענות\s*התובע", "claimant_claims"),
|
||||
(r"טענות\s*הנתבע[ת]?|טענות\s*המוסד|עיקר\s*טענות\s*הנתבע|תשובת\s*המוסד", "respondent_claims"),
|
||||
(r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|הכרעת\s*הדין", "legal_analysis"),
|
||||
(r"מסקנ[הות]|סיכום", "conclusion"),
|
||||
(r"סוף\s*דבר|לסיכום|התוצאה|אשר\s*על\s*כן", "ruling"),
|
||||
(r"מבוא|פתיחה|לפניי|לפני[נו]?", "intro"),
|
||||
(r"הדין\s*החל|המסגרת\s*הנורמטיבית|הוראות\s*החוק", "legal_framework"),
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chunk:
|
||||
content: str
|
||||
section_type: str = "other"
|
||||
page_number: int | None = None
|
||||
chunk_index: int = 0
|
||||
|
||||
|
||||
def chunk_document(
|
||||
text: str,
|
||||
chunk_size: int = config.CHUNK_SIZE_TOKENS,
|
||||
overlap: int = config.CHUNK_OVERLAP_TOKENS,
|
||||
) -> list[Chunk]:
|
||||
"""Split a legal document into chunks, respecting section boundaries."""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
sections = _split_into_sections(text)
|
||||
chunks: list[Chunk] = []
|
||||
idx = 0
|
||||
|
||||
for section_type, section_text in sections:
|
||||
section_chunks = _split_section(section_text, chunk_size, overlap)
|
||||
for chunk_text in section_chunks:
|
||||
chunks.append(Chunk(
|
||||
content=chunk_text,
|
||||
section_type=section_type,
|
||||
chunk_index=idx,
|
||||
))
|
||||
idx += 1
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def _split_into_sections(text: str) -> list[tuple[str, str]]:
|
||||
"""Split text into (section_type, text) pairs based on Hebrew headers."""
|
||||
markers: list[tuple[int, str]] = []
|
||||
|
||||
for pattern, section_type in SECTION_PATTERNS:
|
||||
for match in re.finditer(pattern, text):
|
||||
markers.append((match.start(), section_type))
|
||||
|
||||
if not markers:
|
||||
return [("other", text)]
|
||||
|
||||
markers.sort(key=lambda x: x[0])
|
||||
|
||||
sections: list[tuple[str, str]] = []
|
||||
|
||||
# Text before first section
|
||||
if markers[0][0] > 0:
|
||||
intro_text = text[: markers[0][0]].strip()
|
||||
if intro_text:
|
||||
sections.append(("intro", intro_text))
|
||||
|
||||
# Each section
|
||||
for i, (pos, section_type) in enumerate(markers):
|
||||
end = markers[i + 1][0] if i + 1 < len(markers) else len(text)
|
||||
section_text = text[pos:end].strip()
|
||||
if section_text:
|
||||
sections.append((section_type, section_text))
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]:
|
||||
"""Split a section into overlapping chunks by paragraphs.
|
||||
|
||||
Uses approximate token counting (Hebrew ~1.5 chars per token).
|
||||
"""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
|
||||
chunks: list[str] = []
|
||||
current: list[str] = []
|
||||
current_tokens = 0
|
||||
|
||||
for para in paragraphs:
|
||||
para_tokens = _estimate_tokens(para)
|
||||
|
||||
if current_tokens + para_tokens > chunk_size and current:
|
||||
chunks.append("\n".join(current))
|
||||
# Keep overlap
|
||||
overlap_paras: list[str] = []
|
||||
overlap_tokens = 0
|
||||
for p in reversed(current):
|
||||
pt = _estimate_tokens(p)
|
||||
if overlap_tokens + pt > overlap:
|
||||
break
|
||||
overlap_paras.insert(0, p)
|
||||
overlap_tokens += pt
|
||||
current = overlap_paras
|
||||
current_tokens = overlap_tokens
|
||||
|
||||
current.append(para)
|
||||
current_tokens += para_tokens
|
||||
|
||||
if current:
|
||||
chunks.append("\n".join(current))
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def _estimate_tokens(text: str) -> int:
|
||||
"""Rough token estimate for Hebrew text (~1.5 chars per token)."""
|
||||
return max(1, len(text) // 2)
|
||||
374
mcp-server/src/din_leumi/services/db.py
Normal file
374
mcp-server/src/din_leumi/services/db.py
Normal file
@@ -0,0 +1,374 @@
|
||||
"""Database service - asyncpg connection pool and queries for din-leumi."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import date
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import asyncpg
|
||||
from pgvector.asyncpg import register_vector
|
||||
|
||||
from din_leumi import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_pool: asyncpg.Pool | None = None
|
||||
|
||||
|
||||
async def get_pool() -> asyncpg.Pool:
|
||||
global _pool
|
||||
if _pool is None:
|
||||
conn = await asyncpg.connect(config.POSTGRES_URL)
|
||||
await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
|
||||
await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"')
|
||||
await conn.close()
|
||||
|
||||
_pool = await asyncpg.create_pool(
|
||||
config.POSTGRES_URL,
|
||||
min_size=2,
|
||||
max_size=10,
|
||||
init=_init_connection,
|
||||
)
|
||||
return _pool
|
||||
|
||||
|
||||
async def _init_connection(conn: asyncpg.Connection) -> None:
|
||||
await register_vector(conn)
|
||||
|
||||
|
||||
async def close_pool() -> None:
|
||||
global _pool
|
||||
if _pool:
|
||||
await _pool.close()
|
||||
_pool = None
|
||||
|
||||
|
||||
# ── Schema ──────────────────────────────────────────────────────────
|
||||
|
||||
SCHEMA_SQL = """
|
||||
|
||||
CREATE TABLE IF NOT EXISTS decisions (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
title TEXT NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
extracted_text TEXT DEFAULT '',
|
||||
extraction_status TEXT DEFAULT 'pending',
|
||||
|
||||
court TEXT DEFAULT '',
|
||||
decision_date DATE,
|
||||
case_number TEXT DEFAULT '',
|
||||
judge TEXT DEFAULT '',
|
||||
parties_appellant TEXT DEFAULT '',
|
||||
parties_respondent TEXT DEFAULT 'המוסד לביטוח לאומי',
|
||||
topics JSONB DEFAULT '[]',
|
||||
outcome TEXT DEFAULT '',
|
||||
summary TEXT DEFAULT '',
|
||||
|
||||
page_count INTEGER,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS decision_chunks (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
|
||||
chunk_index INTEGER NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
section_type TEXT DEFAULT 'other',
|
||||
embedding vector(1024),
|
||||
page_number INTEGER,
|
||||
created_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_chunks_decision ON decision_chunks(decision_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_decisions_court ON decisions(court);
|
||||
CREATE INDEX IF NOT EXISTS idx_decisions_date ON decisions(decision_date);
|
||||
CREATE INDEX IF NOT EXISTS idx_decisions_case_number ON decisions(case_number);
|
||||
CREATE INDEX IF NOT EXISTS idx_decisions_topics ON decisions USING gin(topics);
|
||||
"""
|
||||
|
||||
# IVFFlat index requires data to exist; create after first batch of decisions
|
||||
IVFFLAT_INDEX_SQL = """
|
||||
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
|
||||
ON decision_chunks USING ivfflat (embedding vector_cosine_ops)
|
||||
WITH (lists = 100);
|
||||
"""
|
||||
|
||||
|
||||
async def init_schema() -> None:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(SCHEMA_SQL)
|
||||
# Check if we have enough data for IVFFlat
|
||||
count = await conn.fetchval("SELECT count(*) FROM decision_chunks")
|
||||
if count and count > 100:
|
||||
await conn.execute(IVFFLAT_INDEX_SQL)
|
||||
logger.info("IVFFlat index created (%d chunks)", count)
|
||||
logger.info("Database schema initialized")
|
||||
|
||||
|
||||
async def ensure_ivfflat_index() -> None:
|
||||
"""Create IVFFlat index if enough data exists."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
count = await conn.fetchval("SELECT count(*) FROM decision_chunks")
|
||||
if count and count > 100:
|
||||
await conn.execute(IVFFLAT_INDEX_SQL)
|
||||
|
||||
|
||||
# ── Decision CRUD ───────────────────────────────────────────────────
|
||||
|
||||
async def create_decision(
|
||||
title: str,
|
||||
file_path: str,
|
||||
court: str = "",
|
||||
decision_date: date | None = None,
|
||||
case_number: str = "",
|
||||
judge: str = "",
|
||||
parties_appellant: str = "",
|
||||
parties_respondent: str = "המוסד לביטוח לאומי",
|
||||
topics: list[str] | None = None,
|
||||
outcome: str = "",
|
||||
) -> dict:
|
||||
pool = await get_pool()
|
||||
decision_id = uuid4()
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""INSERT INTO decisions (id, title, file_path, court, decision_date,
|
||||
case_number, judge, parties_appellant, parties_respondent,
|
||||
topics, outcome)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
|
||||
decision_id, title, file_path, court, decision_date,
|
||||
case_number, judge, parties_appellant, parties_respondent,
|
||||
json.dumps(topics or []), outcome,
|
||||
)
|
||||
return await get_decision(decision_id)
|
||||
|
||||
|
||||
async def get_decision(decision_id: UUID) -> dict | None:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id)
|
||||
if row is None:
|
||||
return None
|
||||
return _row_to_decision(row)
|
||||
|
||||
|
||||
async def get_decision_by_case_number(case_number: str) -> dict | None:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT * FROM decisions WHERE case_number = $1", case_number
|
||||
)
|
||||
if row is None:
|
||||
return None
|
||||
return _row_to_decision(row)
|
||||
|
||||
|
||||
async def list_decisions(
|
||||
court: str = "",
|
||||
topic: str = "",
|
||||
judge: str = "",
|
||||
date_from: date | None = None,
|
||||
date_to: date | None = None,
|
||||
outcome: str = "",
|
||||
limit: int = 50,
|
||||
) -> list[dict]:
|
||||
pool = await get_pool()
|
||||
conditions = []
|
||||
params: list = []
|
||||
idx = 1
|
||||
|
||||
if court:
|
||||
conditions.append(f"court ILIKE ${idx}")
|
||||
params.append(f"%{court}%")
|
||||
idx += 1
|
||||
if topic:
|
||||
conditions.append(f"topics @> ${idx}::jsonb")
|
||||
params.append(json.dumps([topic]))
|
||||
idx += 1
|
||||
if judge:
|
||||
conditions.append(f"judge ILIKE ${idx}")
|
||||
params.append(f"%{judge}%")
|
||||
idx += 1
|
||||
if date_from:
|
||||
conditions.append(f"decision_date >= ${idx}")
|
||||
params.append(date_from)
|
||||
idx += 1
|
||||
if date_to:
|
||||
conditions.append(f"decision_date <= ${idx}")
|
||||
params.append(date_to)
|
||||
idx += 1
|
||||
if outcome:
|
||||
conditions.append(f"outcome = ${idx}")
|
||||
params.append(outcome)
|
||||
idx += 1
|
||||
|
||||
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
|
||||
params.append(limit)
|
||||
|
||||
sql = f"""
|
||||
SELECT id, title, court, decision_date, case_number, judge,
|
||||
parties_appellant, parties_respondent, topics, outcome, summary,
|
||||
extraction_status, page_count, created_at
|
||||
FROM decisions
|
||||
{where}
|
||||
ORDER BY decision_date DESC NULLS LAST, created_at DESC
|
||||
LIMIT ${idx}
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(sql, *params)
|
||||
return [_row_to_decision(r) for r in rows]
|
||||
|
||||
|
||||
async def update_decision(decision_id: UUID, **fields) -> dict | None:
|
||||
if not fields:
|
||||
return await get_decision(decision_id)
|
||||
pool = await get_pool()
|
||||
set_clauses = []
|
||||
values = []
|
||||
for i, (key, val) in enumerate(fields.items(), start=2):
|
||||
if key == "topics":
|
||||
val = json.dumps(val)
|
||||
set_clauses.append(f"{key} = ${i}")
|
||||
values.append(val)
|
||||
set_clauses.append("updated_at = now()")
|
||||
sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1"
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(sql, decision_id, *values)
|
||||
return await get_decision(decision_id)
|
||||
|
||||
|
||||
async def delete_decision(decision_id: UUID) -> bool:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute("DELETE FROM decisions WHERE id = $1", decision_id)
|
||||
return result == "DELETE 1"
|
||||
|
||||
|
||||
def _row_to_decision(row: asyncpg.Record) -> dict:
|
||||
d = dict(row)
|
||||
if isinstance(d.get("topics"), str):
|
||||
d["topics"] = json.loads(d["topics"])
|
||||
for field in ("id",):
|
||||
if field in d and d[field] is not None:
|
||||
d[field] = str(d[field])
|
||||
return d
|
||||
|
||||
|
||||
# ── Chunks & Vectors ───────────────────────────────────────────────
|
||||
|
||||
async def store_chunks(
|
||||
decision_id: UUID,
|
||||
chunks: list[dict],
|
||||
) -> int:
|
||||
"""Store decision chunks with embeddings."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"DELETE FROM decision_chunks WHERE decision_id = $1", decision_id
|
||||
)
|
||||
for chunk in chunks:
|
||||
await conn.execute(
|
||||
"""INSERT INTO decision_chunks
|
||||
(decision_id, chunk_index, content, section_type, embedding, page_number)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)""",
|
||||
decision_id,
|
||||
chunk["chunk_index"],
|
||||
chunk["content"],
|
||||
chunk.get("section_type", "other"),
|
||||
chunk["embedding"],
|
||||
chunk.get("page_number"),
|
||||
)
|
||||
return len(chunks)
|
||||
|
||||
|
||||
async def search_similar(
|
||||
query_embedding: list[float],
|
||||
limit: int = 10,
|
||||
court: str = "",
|
||||
topic: str = "",
|
||||
date_from: date | None = None,
|
||||
date_to: date | None = None,
|
||||
outcome: str = "",
|
||||
) -> list[dict]:
|
||||
"""Cosine similarity search on decision chunks with metadata filtering."""
|
||||
pool = await get_pool()
|
||||
conditions = []
|
||||
params: list = [query_embedding, limit]
|
||||
idx = 3
|
||||
|
||||
if court:
|
||||
conditions.append(f"d.court ILIKE ${idx}")
|
||||
params.append(f"%{court}%")
|
||||
idx += 1
|
||||
if topic:
|
||||
conditions.append(f"d.topics @> ${idx}::jsonb")
|
||||
params.append(json.dumps([topic]))
|
||||
idx += 1
|
||||
if date_from:
|
||||
conditions.append(f"d.decision_date >= ${idx}")
|
||||
params.append(date_from)
|
||||
idx += 1
|
||||
if date_to:
|
||||
conditions.append(f"d.decision_date <= ${idx}")
|
||||
params.append(date_to)
|
||||
idx += 1
|
||||
if outcome:
|
||||
conditions.append(f"d.outcome = ${idx}")
|
||||
params.append(outcome)
|
||||
idx += 1
|
||||
|
||||
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
|
||||
|
||||
sql = f"""
|
||||
SELECT dc.content, dc.section_type, dc.page_number,
|
||||
dc.decision_id,
|
||||
d.title, d.case_number, d.court, d.decision_date,
|
||||
d.judge, d.outcome,
|
||||
1 - (dc.embedding <=> $1) AS score
|
||||
FROM decision_chunks dc
|
||||
JOIN decisions d ON d.id = dc.decision_id
|
||||
{where}
|
||||
ORDER BY dc.embedding <=> $1
|
||||
LIMIT $2
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(sql, *params)
|
||||
results = []
|
||||
for r in rows:
|
||||
d = dict(r)
|
||||
if d.get("decision_id"):
|
||||
d["decision_id"] = str(d["decision_id"])
|
||||
results.append(d)
|
||||
return results
|
||||
|
||||
|
||||
# ── Stats ──────────────────────────────────────────────────────────
|
||||
|
||||
async def get_stats() -> dict:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
total_decisions = await conn.fetchval("SELECT count(*) FROM decisions")
|
||||
total_chunks = await conn.fetchval("SELECT count(*) FROM decision_chunks")
|
||||
completed = await conn.fetchval(
|
||||
"SELECT count(*) FROM decisions WHERE extraction_status = 'completed'"
|
||||
)
|
||||
courts = await conn.fetch(
|
||||
"SELECT court, count(*) as cnt FROM decisions WHERE court != '' GROUP BY court ORDER BY cnt DESC LIMIT 10"
|
||||
)
|
||||
date_range = await conn.fetchrow(
|
||||
"SELECT min(decision_date) as earliest, max(decision_date) as latest FROM decisions WHERE decision_date IS NOT NULL"
|
||||
)
|
||||
return {
|
||||
"total_decisions": total_decisions,
|
||||
"completed_decisions": completed,
|
||||
"total_chunks": total_chunks,
|
||||
"courts": [{"court": r["court"], "count": r["cnt"]} for r in courts],
|
||||
"date_range": {
|
||||
"earliest": str(date_range["earliest"]) if date_range and date_range["earliest"] else None,
|
||||
"latest": str(date_range["latest"]) if date_range and date_range["latest"] else None,
|
||||
} if date_range else None,
|
||||
}
|
||||
55
mcp-server/src/din_leumi/services/embeddings.py
Normal file
55
mcp-server/src/din_leumi/services/embeddings.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Embedding service using Voyage AI API."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
import voyageai
|
||||
|
||||
from din_leumi import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_client: voyageai.Client | None = None
|
||||
|
||||
|
||||
def _get_client() -> voyageai.Client:
|
||||
global _client
|
||||
if _client is None:
|
||||
_client = voyageai.Client(api_key=config.VOYAGE_API_KEY)
|
||||
return _client
|
||||
|
||||
|
||||
async def embed_texts(texts: list[str], input_type: str = "document") -> list[list[float]]:
|
||||
"""Embed a batch of texts using Voyage AI.
|
||||
|
||||
Args:
|
||||
texts: List of texts to embed (max 128 per call).
|
||||
input_type: "document" for indexing, "query" for search queries.
|
||||
|
||||
Returns:
|
||||
List of embedding vectors (1024 dimensions each).
|
||||
"""
|
||||
if not texts:
|
||||
return []
|
||||
|
||||
client = _get_client()
|
||||
all_embeddings = []
|
||||
|
||||
# Voyage AI supports up to 128 texts per batch
|
||||
for i in range(0, len(texts), 128):
|
||||
batch = texts[i : i + 128]
|
||||
result = client.embed(
|
||||
batch,
|
||||
model=config.VOYAGE_MODEL,
|
||||
input_type=input_type,
|
||||
)
|
||||
all_embeddings.extend(result.embeddings)
|
||||
|
||||
return all_embeddings
|
||||
|
||||
|
||||
async def embed_query(query: str) -> list[float]:
|
||||
"""Embed a single search query."""
|
||||
results = await embed_texts([query], input_type="query")
|
||||
return results[0]
|
||||
126
mcp-server/src/din_leumi/services/extractor.py
Normal file
126
mcp-server/src/din_leumi/services/extractor.py
Normal file
@@ -0,0 +1,126 @@
|
||||
"""Text extraction from PDF, DOCX, and RTF files.
|
||||
|
||||
Primary PDF extraction: Claude Vision API (for scanned documents).
|
||||
Fallback: PyMuPDF direct text extraction (for born-digital PDFs).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import anthropic
|
||||
import fitz # PyMuPDF
|
||||
from docx import Document as DocxDocument
|
||||
from striprtf.striprtf import rtf_to_text
|
||||
|
||||
from din_leumi import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_anthropic_client: anthropic.Anthropic | None = None
|
||||
|
||||
|
||||
def _get_anthropic() -> anthropic.Anthropic:
|
||||
global _anthropic_client
|
||||
if _anthropic_client is None:
|
||||
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
|
||||
return _anthropic_client
|
||||
|
||||
|
||||
async def extract_text(file_path: str) -> tuple[str, int]:
|
||||
"""Extract text from a document file.
|
||||
|
||||
Returns:
|
||||
Tuple of (extracted_text, page_count).
|
||||
page_count is 0 for non-PDF files.
|
||||
"""
|
||||
path = Path(file_path)
|
||||
suffix = path.suffix.lower()
|
||||
|
||||
if suffix == ".pdf":
|
||||
return await _extract_pdf(path)
|
||||
elif suffix == ".docx":
|
||||
return _extract_docx(path), 0
|
||||
elif suffix == ".rtf":
|
||||
return _extract_rtf(path), 0
|
||||
elif suffix == ".txt":
|
||||
return path.read_text(encoding="utf-8"), 0
|
||||
else:
|
||||
raise ValueError(f"Unsupported file type: {suffix}")
|
||||
|
||||
|
||||
async def _extract_pdf(path: Path) -> tuple[str, int]:
|
||||
"""Extract text from PDF. Try direct text first, fall back to Claude Vision for scanned pages."""
|
||||
doc = fitz.open(str(path))
|
||||
page_count = len(doc)
|
||||
pages_text: list[str] = []
|
||||
|
||||
for page_num in range(page_count):
|
||||
page = doc[page_num]
|
||||
# Try direct text extraction first
|
||||
text = page.get_text().strip()
|
||||
|
||||
if len(text) > 50:
|
||||
# Sufficient text found - born-digital page
|
||||
pages_text.append(text)
|
||||
logger.debug("Page %d: direct text extraction (%d chars)", page_num + 1, len(text))
|
||||
else:
|
||||
# Likely scanned - use Claude Vision
|
||||
logger.info("Page %d: using Claude Vision OCR", page_num + 1)
|
||||
pix = page.get_pixmap(dpi=200)
|
||||
img_bytes = pix.tobytes("png")
|
||||
ocr_text = await _ocr_with_claude(img_bytes, page_num + 1)
|
||||
pages_text.append(ocr_text)
|
||||
|
||||
doc.close()
|
||||
return "\n\n".join(pages_text), page_count
|
||||
|
||||
|
||||
async def _ocr_with_claude(image_bytes: bytes, page_num: int) -> str:
|
||||
"""OCR a single page image using Claude Vision API."""
|
||||
client = _get_anthropic()
|
||||
b64_image = base64.b64encode(image_bytes).decode("utf-8")
|
||||
|
||||
message = client.messages.create(
|
||||
model="claude-sonnet-4-20250514",
|
||||
max_tokens=4096,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "base64",
|
||||
"media_type": "image/png",
|
||||
"data": b64_image,
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"text": (
|
||||
"חלץ את כל הטקסט מהתמונה הזו. זהו מסמך משפטי בעברית. "
|
||||
"שמור על מבנה הפסקאות המקורי. "
|
||||
"החזר רק את הטקסט המחולץ, ללא הערות נוספות."
|
||||
),
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
)
|
||||
return message.content[0].text
|
||||
|
||||
|
||||
def _extract_docx(path: Path) -> str:
|
||||
"""Extract text from DOCX file."""
|
||||
doc = DocxDocument(str(path))
|
||||
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
|
||||
return "\n\n".join(paragraphs)
|
||||
|
||||
|
||||
def _extract_rtf(path: Path) -> str:
|
||||
"""Extract text from RTF file."""
|
||||
rtf_content = path.read_text(encoding="utf-8", errors="replace")
|
||||
return rtf_to_text(rtf_content)
|
||||
82
mcp-server/src/din_leumi/services/processor.py
Normal file
82
mcp-server/src/din_leumi/services/processor.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Decision processing pipeline: extract → chunk → embed → store."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from din_leumi.services import chunker, db, embeddings, extractor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def process_decision(decision_id: UUID) -> dict:
|
||||
"""Full processing pipeline for a decision.
|
||||
|
||||
1. Extract text from file
|
||||
2. Split into chunks
|
||||
3. Generate embeddings
|
||||
4. Store chunks + embeddings in DB
|
||||
|
||||
Returns processing summary.
|
||||
"""
|
||||
decision = await db.get_decision(decision_id)
|
||||
if not decision:
|
||||
raise ValueError(f"Decision {decision_id} not found")
|
||||
|
||||
await db.update_decision(decision_id, extraction_status="processing")
|
||||
|
||||
try:
|
||||
# Step 1: Extract text
|
||||
logger.info("Extracting text from %s", decision["file_path"])
|
||||
text, page_count = await extractor.extract_text(decision["file_path"])
|
||||
|
||||
await db.update_decision(
|
||||
decision_id,
|
||||
extracted_text=text,
|
||||
page_count=page_count,
|
||||
)
|
||||
|
||||
# Step 2: Chunk
|
||||
logger.info("Chunking decision (%d chars)", len(text))
|
||||
chunks = chunker.chunk_document(text)
|
||||
|
||||
if not chunks:
|
||||
await db.update_decision(decision_id, extraction_status="completed")
|
||||
return {"status": "completed", "chunks": 0, "message": "No text to chunk"}
|
||||
|
||||
# Step 3: Embed
|
||||
logger.info("Generating embeddings for %d chunks", len(chunks))
|
||||
texts = [c.content for c in chunks]
|
||||
embs = await embeddings.embed_texts(texts, input_type="document")
|
||||
|
||||
# Step 4: Store
|
||||
chunk_dicts = [
|
||||
{
|
||||
"content": c.content,
|
||||
"section_type": c.section_type,
|
||||
"embedding": emb,
|
||||
"page_number": c.page_number,
|
||||
"chunk_index": c.chunk_index,
|
||||
}
|
||||
for c, emb in zip(chunks, embs)
|
||||
]
|
||||
|
||||
stored = await db.store_chunks(decision_id, chunk_dicts)
|
||||
await db.update_decision(decision_id, extraction_status="completed")
|
||||
|
||||
# Try to create IVFFlat index if we have enough data
|
||||
await db.ensure_ivfflat_index()
|
||||
|
||||
logger.info("Decision processed: %d chunks stored", stored)
|
||||
return {
|
||||
"status": "completed",
|
||||
"chunks": stored,
|
||||
"pages": page_count,
|
||||
"text_length": len(text),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Decision processing failed: %s", e)
|
||||
await db.update_decision(decision_id, extraction_status="failed")
|
||||
return {"status": "failed", "error": str(e)}
|
||||
Reference in New Issue
Block a user