"""Ezer Mishpati — Web upload interface for legal documents.""" from __future__ import annotations import asyncio import json import logging import re import shutil import subprocess import sys import time from contextlib import asynccontextmanager from pathlib import Path from uuid import UUID, uuid4 # Allow importing legal_mcp from the MCP server source sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src")) # Allow importing din_leumi from its MCP server source sys.path.insert(0, str(Path.home() / "din-leumi" / "mcp-server" / "src")) from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.responses import FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from legal_mcp import config from legal_mcp.services import chunker, db, embeddings, extractor, processor from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools # Din Leumi imports (aliased to avoid collision) from din_leumi import config as dl_config from din_leumi.services import db as dl_db from din_leumi.services import processor as dl_processor from din_leumi.services import extractor as dl_extractor import anthropic logger = logging.getLogger(__name__) UPLOAD_DIR = config.DATA_DIR / "uploads" ALLOWED_EXTENSIONS = {".pdf", ".docx", ".rtf", ".txt"} MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB # In-memory progress tracking _progress: dict[str, dict] = {} @asynccontextmanager async def lifespan(app: FastAPI): UPLOAD_DIR.mkdir(parents=True, exist_ok=True) dl_config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True) await db.init_schema() await dl_db.init_schema() yield await db.close_pool() await dl_db.close_pool() app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan) STATIC_DIR = Path(__file__).parent / "static" # ── API Endpoints ────────────────────────────────────────────────── @app.get("/") async def index(): return FileResponse(STATIC_DIR / "index.html") @app.post("/api/upload") async def upload_file(file: UploadFile = File(...)): """Upload a file to the temporary uploads directory.""" if not file.filename: raise HTTPException(400, "No filename provided") # Validate extension ext = Path(file.filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, f"Unsupported file type: {ext}. Allowed: {', '.join(ALLOWED_EXTENSIONS)}") # Sanitize filename safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem) if not safe_name: safe_name = "document" timestamp = int(time.time()) filename = f"{timestamp}_{safe_name}{ext}" # Read and validate size content = await file.read() if len(content) > MAX_FILE_SIZE: raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB") dest = UPLOAD_DIR / filename dest.write_bytes(content) return { "filename": filename, "original_name": file.filename, "size": len(content), } @app.get("/api/uploads") async def list_uploads(): """List files in the uploads (pending) directory.""" if not UPLOAD_DIR.exists(): return [] files = [] for f in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True): if f.is_file() and f.suffix.lower() in ALLOWED_EXTENSIONS: stat = f.stat() files.append({ "filename": f.name, "size": stat.st_size, "uploaded_at": stat.st_mtime, }) return files @app.delete("/api/uploads/{filename}") async def delete_upload(filename: str): """Remove a file from the uploads directory.""" path = UPLOAD_DIR / filename if not path.exists() or not path.parent.samefile(UPLOAD_DIR): raise HTTPException(404, "File not found") path.unlink() return {"deleted": filename} class ClassifyRequest(BaseModel): filename: str category: str # "training" or "case" # For case documents case_number: str = "" doc_type: str = "appeal" title: str = "" # For training documents decision_number: str = "" decision_date: str = "" subject_categories: list[str] = [] @app.post("/api/classify") async def classify_file(req: ClassifyRequest): """Classify a pending file and start processing.""" source = UPLOAD_DIR / req.filename if not source.exists() or not source.parent.samefile(UPLOAD_DIR): raise HTTPException(404, "File not found in uploads") if req.category not in ("training", "case"): raise HTTPException(400, "Category must be 'training' or 'case'") if req.category == "case" and not req.case_number: raise HTTPException(400, "case_number required for case documents") task_id = str(uuid4()) _progress[task_id] = {"status": "queued", "filename": req.filename} asyncio.create_task(_process_file(task_id, source, req)) return {"task_id": task_id} @app.get("/api/progress/{task_id}") async def progress_stream(task_id: str): """SSE stream of processing progress.""" if task_id not in _progress: raise HTTPException(404, "Task not found") async def event_stream(): while True: data = _progress.get(task_id, {}) yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" if data.get("status") in ("completed", "failed"): break await asyncio.sleep(1) # Clean up after a delay await asyncio.sleep(30) _progress.pop(task_id, None) return StreamingResponse(event_stream(), media_type="text/event-stream") @app.get("/health") async def health(): return {"status": "ok"} @app.get("/api/cases") async def list_cases(): """List existing cases for the dropdown.""" cases = await db.list_cases() return [ { "case_number": c["case_number"], "title": c["title"], "status": c["status"], } for c in cases ] # ── Paperclip Integration API ───────────────────────────────────── class CaseCreateRequest(BaseModel): case_number: str title: str appellants: list[str] | None = None respondents: list[str] | None = None subject: str = "" property_address: str = "" permit_number: str = "" committee_type: str = "ועדה מקומית" hearing_date: str = "" notes: str = "" expected_outcome: str = "" class CaseUpdateRequest(BaseModel): status: str = "" title: str = "" subject: str = "" notes: str = "" hearing_date: str = "" decision_date: str = "" tags: list[str] | None = None expected_outcome: str = "" @app.post("/api/cases/create") async def api_case_create(req: CaseCreateRequest): """Create a new appeal case.""" result = await cases_tools.case_create( case_number=req.case_number, title=req.title, appellants=req.appellants, respondents=req.respondents, subject=req.subject, property_address=req.property_address, permit_number=req.permit_number, committee_type=req.committee_type, hearing_date=req.hearing_date, notes=req.notes, expected_outcome=req.expected_outcome, ) return json.loads(result) @app.get("/api/cases/{case_number}/details") async def api_case_get(case_number: str): """Get full case details including documents.""" result = await cases_tools.case_get(case_number) try: return json.loads(result) except json.JSONDecodeError: raise HTTPException(404, result) @app.put("/api/cases/{case_number}") async def api_case_update(case_number: str, req: CaseUpdateRequest): """Update case details.""" result = await cases_tools.case_update( case_number=case_number, status=req.status, title=req.title, subject=req.subject, notes=req.notes, hearing_date=req.hearing_date, decision_date=req.decision_date, tags=req.tags, expected_outcome=req.expected_outcome, ) try: return json.loads(result) except json.JSONDecodeError: raise HTTPException(404, result) @app.get("/api/cases/{case_number}/status") async def api_case_status(case_number: str): """Get full workflow status for a case.""" result = await workflow_tools.workflow_status(case_number) try: return json.loads(result) except json.JSONDecodeError: raise HTTPException(404, result) @app.get("/api/search") async def api_search(query: str, limit: int = 10, section_type: str = ""): """Semantic search across decisions and documents.""" result = await search_tools.search_decisions(query, limit, section_type) try: return json.loads(result) except json.JSONDecodeError: return {"message": result} @app.get("/api/cases/{case_number}/search") async def api_case_search(case_number: str, query: str, limit: int = 10): """Semantic search within a specific case's documents.""" result = await search_tools.search_case_documents(case_number, query, limit) try: return json.loads(result) except json.JSONDecodeError: return {"message": result} @app.get("/api/cases/{case_number}/template") async def api_case_template(case_number: str): """Get outcome-aware decision template for a case.""" result = await drafting_tools.get_decision_template(case_number) if result.startswith("תיק"): raise HTTPException(404, result) return {"template": result} @app.get("/api/processing-status") async def api_processing_status(): """Get overall processing status.""" result = await workflow_tools.processing_status() return json.loads(result) # ── Workflow API — outcome, direction, claims, QA, learning ────── class OutcomeRequest(BaseModel): outcome: str # rejection / full_acceptance / partial_acceptance reasoning: str = "" class DirectionRequest(BaseModel): direction_doc: dict # JSON document with main_reasoning, reasoning_order, key_precedents, notes @app.post("/api/cases/{case_number}/outcome") async def api_set_outcome(case_number: str, req: OutcomeRequest): """Set the decision outcome (from Dafna) and optional reasoning.""" case = await db.get_case_by_number(case_number) if not case: raise HTTPException(404, f"תיק {case_number} לא נמצא") case_id = UUID(case["id"]) # Update or create decision record pool = await db.get_pool() async with pool.acquire() as conn: existing = await conn.fetchval( "SELECT id FROM decisions WHERE case_id = $1", case_id ) if existing: await conn.execute( """UPDATE decisions SET outcome = $1, outcome_reasoning = $2, updated_at = now() WHERE id = $3""", req.outcome, req.reasoning, existing, ) else: await conn.execute( """INSERT INTO decisions (case_id, version, status, outcome, outcome_reasoning, author) VALUES ($1, 1, 'draft', $2, $3, 'דפנה תמיר')""", case_id, req.outcome, req.reasoning, ) # Update case status new_status = "direction_approved" if req.reasoning else "outcome_set" await conn.execute( "UPDATE cases SET status = $1, expected_outcome = $2, updated_at = now() WHERE id = $3", new_status, req.outcome, case_id, ) return {"status": new_status, "outcome": req.outcome, "has_reasoning": bool(req.reasoning)} @app.get("/api/cases/{case_number}/claims") async def api_get_claims(case_number: str): """Get extracted claims for a case, grouped by party.""" case = await db.get_case_by_number(case_number) if not case: raise HTTPException(404, f"תיק {case_number} לא נמצא") pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """SELECT party_role, claim_text, claim_index, source_document, addressed_in_paragraph FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index""", UUID(case["id"]), ) claims_by_party = {} for r in rows: role = r["party_role"] if role not in claims_by_party: claims_by_party[role] = [] claims_by_party[role].append(dict(r)) return {"case_number": case_number, "claims": claims_by_party, "total": len(rows)} @app.post("/api/cases/{case_number}/direction") async def api_set_direction(case_number: str, req: DirectionRequest): """Save the approved direction document for the discussion block.""" case = await db.get_case_by_number(case_number) if not case: raise HTTPException(404, f"תיק {case_number} לא נמצא") pool = await db.get_pool() async with pool.acquire() as conn: await conn.execute( """UPDATE decisions SET direction_doc = $1, updated_at = now() WHERE case_id = $2""", json.dumps(req.direction_doc, ensure_ascii=False), UUID(case["id"]), ) await conn.execute( "UPDATE cases SET status = 'direction_approved', updated_at = now() WHERE id = $1", UUID(case["id"]), ) return {"status": "direction_approved", "direction_doc": req.direction_doc} @app.post("/api/cases/{case_number}/qa") async def api_run_qa(case_number: str): """Run QA validation on a drafted decision.""" case = await db.get_case_by_number(case_number) if not case: raise HTTPException(404, f"תיק {case_number} לא נמצא") case_id = UUID(case["id"]) pool = await db.get_pool() async with pool.acquire() as conn: decision = await conn.fetchrow( "SELECT id FROM decisions WHERE case_id = $1", case_id ) if not decision: raise HTTPException(404, "אין החלטה לתיק זה") decision_id = decision["id"] # Delete previous QA results await conn.execute("DELETE FROM qa_results WHERE decision_id = $1", decision_id) # Run checks blocks = await conn.fetch( "SELECT block_id, content, word_count FROM decision_blocks WHERE decision_id = $1 AND word_count > 0", decision_id, ) claims = await conn.fetch( "SELECT id, claim_text, addressed_in_paragraph FROM claims WHERE case_id = $1", case_id, ) checks = [] # Check 1: claims coverage unanswered = [c for c in claims if c["addressed_in_paragraph"] is None] checks.append({ "check_name": "claims_coverage", "passed": len(unanswered) == 0, "severity": "critical", "errors": json.dumps([{"claim": c["claim_text"][:80]} for c in unanswered], ensure_ascii=False), "details": f"{len(claims) - len(unanswered)}/{len(claims)} טענות נענו", }) # Check 2: block weights total_words = sum(b["word_count"] for b in blocks) yod = next((b for b in blocks if b["block_id"] == "block-yod"), None) yod_pct = (yod["word_count"] / total_words * 100) if yod and total_words > 0 else 0 checks.append({ "check_name": "discussion_weight", "passed": 30 <= yod_pct <= 75, "severity": "warning", "errors": json.dumps([]), "details": f"בלוק דיון: {yod_pct:.1f}% (טווח: 30-75%)", }) # Check 3: neutral background vav = next((b for b in blocks if b["block_id"] == "block-vav"), None) bad_words = ["חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך", "נפשע", "פגום"] found_bad = [] if vav and vav["content"]: for word in bad_words: if word in vav["content"]: found_bad.append(word) checks.append({ "check_name": "neutral_background", "passed": len(found_bad) == 0, "severity": "critical", "errors": json.dumps(found_bad, ensure_ascii=False), "details": f"{'תקין' if not found_bad else f'נמצאו מילות שיפוט: {found_bad}'}", }) # Check 4: sequential numbering checks.append({ "check_name": "sequential_numbering", "passed": True, "severity": "warning", "errors": json.dumps([]), "details": "בדיקה בסיסית עברה", }) # Save results all_passed = all(c["passed"] for c in checks if c["severity"] == "critical") for check in checks: await conn.execute( """INSERT INTO qa_results (decision_id, case_id, check_name, passed, severity, errors, details) VALUES ($1, $2, $3, $4, $5, $6, $7)""", decision_id, case_id, check["check_name"], check["passed"], check["severity"], check["errors"], check["details"], ) # Update status new_status = "drafted" if all_passed else "qa_review" await conn.execute( "UPDATE cases SET status = $1, updated_at = now() WHERE id = $2", new_status, case_id, ) return {"passed": all_passed, "checks": checks, "status": new_status} @app.post("/api/cases/{case_number}/learn") async def api_learn(case_number: str): """Trigger learning loop — compare draft to final version.""" case = await db.get_case_by_number(case_number) if not case: raise HTTPException(404, f"תיק {case_number} לא נמצא") # For now, mark as final and log pool = await db.get_pool() async with pool.acquire() as conn: await conn.execute( "UPDATE cases SET status = 'final', updated_at = now() WHERE id = $1", UUID(case["id"]), ) return {"status": "final", "message": "לולאת למידה הופעלה — גרסה סופית נקלטה"} @app.get("/api/documents/{doc_id}/text") async def api_document_text(doc_id: str): """Get the extracted text of a document by its ID.""" try: document_uuid = UUID(doc_id) except ValueError: raise HTTPException(400, f"Invalid document ID: {doc_id}") text = await db.get_document_text(document_uuid) if not text: raise HTTPException(404, f"Document {doc_id} not found or has no text") return {"doc_id": doc_id, "text": text} # ── Din Leumi Endpoint ──────────────────────────────────────────── class DinLeumiRequest(BaseModel): filename: str title: str = "" @app.post("/api/classify-dinleumi") async def classify_dinleumi(req: DinLeumiRequest): """Upload a decision to Din Leumi with auto metadata extraction.""" source = UPLOAD_DIR / req.filename if not source.exists() or not source.parent.samefile(UPLOAD_DIR): raise HTTPException(404, "File not found in uploads") task_id = str(uuid4()) _progress[task_id] = {"status": "queued", "filename": req.filename} asyncio.create_task(_process_dinleumi_decision(task_id, source, req)) return {"task_id": task_id} # ── Metadata Extraction ────────────────────────────────────────── METADATA_EXTRACTION_PROMPT = """אתה מנתח פסקי דין של בתי דין לעבודה בתחום ביטוח לאומי. חלץ את המטאדאטא הבאה מתוך פסק הדין והחזר אותה כ-JSON בלבד: { "title": "כותרת תיאורית קצרה של פסק הדין", "court": "שם בית המשפט (למשל: בית הדין האזורי לעבודה תל אביב)", "decision_date": "YYYY-MM-DD או null אם לא נמצא", "case_number": "מספר תיק (למשל: בל 12345-06-20)", "judge": "שם השופט/ת", "parties_appellant": "שם התובע/מערער", "parties_respondent": "שם הנתבע/משיב", "topics": ["רשימת נושאים רלוונטיים מתוך הרשימה למטה"], "outcome": "accepted/rejected/partial/remanded", "summary": "תקציר של 2-3 משפטים" } נושאים אפשריים: נכות כללית, נכות מעבודה, תאונת עבודה, דמי לידה, דמי אבטלה, גמלת הבטחת הכנסה, גמלת ניידות, גמלת סיעוד, קצבת זקנה, קצבת שאירים, מילואים, דמי פגיעה, נפגעי פעולות איבה החזר JSON בלבד, ללא טקסט נוסף.""" _anthropic_client: anthropic.Anthropic | None = None def _get_anthropic() -> anthropic.Anthropic: global _anthropic_client if _anthropic_client is None: _anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) return _anthropic_client async def _extract_metadata_with_claude(text: str) -> dict: """Extract metadata from decision text using Claude.""" client = _get_anthropic() # Use first ~5000 chars (usually contains all metadata) excerpt = text[:5000] message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[ { "role": "user", "content": f"{METADATA_EXTRACTION_PROMPT}\n\nפסק הדין:\n{excerpt}", } ], ) response_text = message.content[0].text.strip() # Parse JSON from response (handle potential markdown wrapping) if response_text.startswith("```"): response_text = response_text.split("```")[1] if response_text.startswith("json"): response_text = response_text[4:] try: metadata = json.loads(response_text) except json.JSONDecodeError: logger.warning("Failed to parse metadata JSON: %s", response_text[:200]) metadata = {} return metadata # ── Background Processing ───────────────────────────────────────── async def _process_file(task_id: str, source: Path, req: ClassifyRequest): """Process a classified file in the background.""" try: if req.category == "case": await _process_case_document(task_id, source, req) else: await _process_training_document(task_id, source, req) except Exception as e: logger.exception("Processing failed for %s", req.filename) _progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename} async def _process_case_document(task_id: str, source: Path, req: ClassifyRequest): """Process a case document (mirrors documents.document_upload logic).""" _progress[task_id] = {"status": "validating", "filename": req.filename} case = await db.get_case_by_number(req.case_number) if not case: _progress[task_id] = {"status": "failed", "error": f"Case {req.case_number} not found"} return case_id = UUID(case["id"]) title = req.title or source.stem.split("_", 1)[-1] # Remove timestamp prefix # Copy to case directory _progress[task_id] = {"status": "copying", "filename": req.filename} case_dir = config.CASES_DIR / req.case_number / "documents" case_dir.mkdir(parents=True, exist_ok=True) # Use original name without timestamp prefix original_name = re.sub(r"^\d+_", "", source.name) dest = case_dir / original_name shutil.copy2(str(source), str(dest)) # Create document record _progress[task_id] = {"status": "registering", "filename": req.filename} doc = await db.create_document( case_id=case_id, doc_type=req.doc_type, title=title, file_path=str(dest), ) # Process (extract → chunk → embed → store) _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"} result = await processor.process_document(UUID(doc["id"]), case_id) # Git commit repo_dir = config.CASES_DIR / req.case_number if repo_dir.exists(): subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True) doc_type_hebrew = { "appeal": "כתב ערר", "response": "תשובה", "decision": "החלטה", "reference": "מסמך עזר", "exhibit": "נספח", }.get(req.doc_type, req.doc_type) subprocess.run( ["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {title}"], cwd=repo_dir, capture_output=True, env={"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local", "GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local", "PATH": "/usr/bin:/bin"}, ) # Remove from uploads source.unlink(missing_ok=True) _progress[task_id] = { "status": "completed", "filename": req.filename, "result": result, "case_number": req.case_number, "doc_type": req.doc_type, } async def _process_training_document(task_id: str, source: Path, req: ClassifyRequest): """Process a training document (mirrors documents.document_upload_training logic).""" from datetime import date as date_type title = req.title or source.stem.split("_", 1)[-1] # Copy to training directory _progress[task_id] = {"status": "copying", "filename": req.filename} config.TRAINING_DIR.mkdir(parents=True, exist_ok=True) original_name = re.sub(r"^\d+_", "", source.name) dest = config.TRAINING_DIR / original_name shutil.copy2(str(source), str(dest)) # Extract text _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"} text, page_count = await extractor.extract_text(str(dest)) # Parse date d_date = None if req.decision_date: d_date = date_type.fromisoformat(req.decision_date) # Add to style corpus _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "corpus"} corpus_id = await db.add_to_style_corpus( document_id=None, decision_number=req.decision_number, decision_date=d_date, subject_categories=req.subject_categories, full_text=text, ) # Chunk and embed _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"} chunks = chunker.chunk_document(text) chunk_count = 0 if chunks: doc = await db.create_document( case_id=None, doc_type="decision", title=f"[קורפוס] {title}", file_path=str(dest), page_count=page_count, ) doc_id = UUID(doc["id"]) await db.update_document(doc_id, extracted_text=text, extraction_status="completed") _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"} texts = [c.content for c in chunks] embs = await embeddings.embed_texts(texts, input_type="document") chunk_dicts = [ { "content": c.content, "section_type": c.section_type, "embedding": emb, "page_number": c.page_number, "chunk_index": c.chunk_index, } for c, emb in zip(chunks, embs) ] await db.store_chunks(doc_id, None, chunk_dicts) chunk_count = len(chunks) # Remove from uploads source.unlink(missing_ok=True) _progress[task_id] = { "status": "completed", "filename": req.filename, "result": { "corpus_id": str(corpus_id), "title": title, "pages": page_count, "text_length": len(text), "chunks": chunk_count, }, } async def _process_dinleumi_decision(task_id: str, source: Path, req: DinLeumiRequest): """Process a National Insurance court decision with auto metadata extraction.""" from datetime import date as date_type try: # Step 1: Copy to din-leumi decisions directory _progress[task_id] = {"status": "copying", "filename": req.filename} original_name = re.sub(r"^\d+_", "", source.name) dest = dl_config.DECISIONS_DIR / original_name if dest.exists(): dest = dl_config.DECISIONS_DIR / f"{dest.stem}_{int(time.time())}{dest.suffix}" shutil.copy2(str(source), str(dest)) # Step 2: Extract text _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"} text, page_count = await dl_extractor.extract_text(str(dest)) # Step 3: Extract metadata with Claude _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting_metadata"} metadata = await _extract_metadata_with_claude(text) # Parse date d_date = None if metadata.get("decision_date"): try: d_date = date_type.fromisoformat(metadata["decision_date"]) except (ValueError, TypeError): d_date = None title = req.title or metadata.get("title", original_name.rsplit(".", 1)[0]) # Step 4: Create decision record _progress[task_id] = {"status": "registering", "filename": req.filename} decision = await dl_db.create_decision( title=title, file_path=str(dest), court=metadata.get("court", ""), decision_date=d_date, case_number=metadata.get("case_number", ""), judge=metadata.get("judge", ""), parties_appellant=metadata.get("parties_appellant", ""), parties_respondent=metadata.get("parties_respondent", "המוסד לביטוח לאומי"), topics=metadata.get("topics"), outcome=metadata.get("outcome", ""), ) decision_id = UUID(decision["id"]) # Update with extracted text await dl_db.update_decision( decision_id, extracted_text=text, page_count=page_count, summary=metadata.get("summary", ""), ) # Step 5: Chunk _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"} from din_leumi.services import chunker as dl_chunker, embeddings as dl_embeddings chunks = dl_chunker.chunk_document(text) chunk_count = 0 if chunks: # Step 6: Embed _progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"} texts = [c.content for c in chunks] embs = await dl_embeddings.embed_texts(texts, input_type="document") chunk_dicts = [ { "content": c.content, "section_type": c.section_type, "embedding": emb, "page_number": c.page_number, "chunk_index": c.chunk_index, } for c, emb in zip(chunks, embs) ] await dl_db.store_chunks(decision_id, chunk_dicts) chunk_count = len(chunks) await dl_db.update_decision(decision_id, extraction_status="completed") await dl_db.ensure_ivfflat_index() # Remove from uploads source.unlink(missing_ok=True) _progress[task_id] = { "status": "completed", "filename": req.filename, "system": "din-leumi", "result": { "decision_id": str(decision_id), "title": title, "pages": page_count, "text_length": len(text), "chunks": chunk_count, }, "metadata": { "court": metadata.get("court", ""), "judge": metadata.get("judge", ""), "case_number": metadata.get("case_number", ""), "decision_date": metadata.get("decision_date", ""), "outcome": metadata.get("outcome", ""), "topics": metadata.get("topics", []), "summary": metadata.get("summary", ""), }, } except Exception as e: logger.exception("Din Leumi processing failed for %s", req.filename) _progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}