New endpoints: outcome, direction, claims, QA validation, learning loop, document text retrieval. Updated Dockerfile and project documentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
919 lines
32 KiB
Python
919 lines
32 KiB
Python
"""Ezer Mishpati — Web upload interface for legal documents."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from uuid import UUID, uuid4
|
|
|
|
# Allow importing legal_mcp from the MCP server source
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
|
|
# Allow importing din_leumi from its MCP server source
|
|
sys.path.insert(0, str(Path.home() / "din-leumi" / "mcp-server" / "src"))
|
|
|
|
from fastapi import FastAPI, File, HTTPException, UploadFile
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from legal_mcp import config
|
|
from legal_mcp.services import chunker, db, embeddings, extractor, processor
|
|
from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools
|
|
|
|
# Din Leumi imports (aliased to avoid collision)
|
|
from din_leumi import config as dl_config
|
|
from din_leumi.services import db as dl_db
|
|
from din_leumi.services import processor as dl_processor
|
|
from din_leumi.services import extractor as dl_extractor
|
|
|
|
import anthropic
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
UPLOAD_DIR = config.DATA_DIR / "uploads"
|
|
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".rtf", ".txt"}
|
|
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
|
|
|
|
# In-memory progress tracking
|
|
_progress: dict[str, dict] = {}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
dl_config.DECISIONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
await db.init_schema()
|
|
await dl_db.init_schema()
|
|
yield
|
|
await db.close_pool()
|
|
await dl_db.close_pool()
|
|
|
|
|
|
app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan)
|
|
|
|
STATIC_DIR = Path(__file__).parent / "static"
|
|
|
|
|
|
# ── API Endpoints ──────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/")
|
|
async def index():
|
|
return FileResponse(STATIC_DIR / "index.html")
|
|
|
|
|
|
@app.post("/api/upload")
|
|
async def upload_file(file: UploadFile = File(...)):
|
|
"""Upload a file to the temporary uploads directory."""
|
|
if not file.filename:
|
|
raise HTTPException(400, "No filename provided")
|
|
|
|
# Validate extension
|
|
ext = Path(file.filename).suffix.lower()
|
|
if ext not in ALLOWED_EXTENSIONS:
|
|
raise HTTPException(400, f"Unsupported file type: {ext}. Allowed: {', '.join(ALLOWED_EXTENSIONS)}")
|
|
|
|
# Sanitize filename
|
|
safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem)
|
|
if not safe_name:
|
|
safe_name = "document"
|
|
timestamp = int(time.time())
|
|
filename = f"{timestamp}_{safe_name}{ext}"
|
|
|
|
# Read and validate size
|
|
content = await file.read()
|
|
if len(content) > MAX_FILE_SIZE:
|
|
raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB")
|
|
|
|
dest = UPLOAD_DIR / filename
|
|
dest.write_bytes(content)
|
|
|
|
return {
|
|
"filename": filename,
|
|
"original_name": file.filename,
|
|
"size": len(content),
|
|
}
|
|
|
|
|
|
@app.get("/api/uploads")
|
|
async def list_uploads():
|
|
"""List files in the uploads (pending) directory."""
|
|
if not UPLOAD_DIR.exists():
|
|
return []
|
|
files = []
|
|
for f in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
|
|
if f.is_file() and f.suffix.lower() in ALLOWED_EXTENSIONS:
|
|
stat = f.stat()
|
|
files.append({
|
|
"filename": f.name,
|
|
"size": stat.st_size,
|
|
"uploaded_at": stat.st_mtime,
|
|
})
|
|
return files
|
|
|
|
|
|
@app.delete("/api/uploads/{filename}")
|
|
async def delete_upload(filename: str):
|
|
"""Remove a file from the uploads directory."""
|
|
path = UPLOAD_DIR / filename
|
|
if not path.exists() or not path.parent.samefile(UPLOAD_DIR):
|
|
raise HTTPException(404, "File not found")
|
|
path.unlink()
|
|
return {"deleted": filename}
|
|
|
|
|
|
class ClassifyRequest(BaseModel):
|
|
filename: str
|
|
category: str # "training" or "case"
|
|
# For case documents
|
|
case_number: str = ""
|
|
doc_type: str = "appeal"
|
|
title: str = ""
|
|
# For training documents
|
|
decision_number: str = ""
|
|
decision_date: str = ""
|
|
subject_categories: list[str] = []
|
|
|
|
|
|
@app.post("/api/classify")
|
|
async def classify_file(req: ClassifyRequest):
|
|
"""Classify a pending file and start processing."""
|
|
source = UPLOAD_DIR / req.filename
|
|
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
|
|
raise HTTPException(404, "File not found in uploads")
|
|
|
|
if req.category not in ("training", "case"):
|
|
raise HTTPException(400, "Category must be 'training' or 'case'")
|
|
|
|
if req.category == "case" and not req.case_number:
|
|
raise HTTPException(400, "case_number required for case documents")
|
|
|
|
task_id = str(uuid4())
|
|
_progress[task_id] = {"status": "queued", "filename": req.filename}
|
|
|
|
asyncio.create_task(_process_file(task_id, source, req))
|
|
|
|
return {"task_id": task_id}
|
|
|
|
|
|
@app.get("/api/progress/{task_id}")
|
|
async def progress_stream(task_id: str):
|
|
"""SSE stream of processing progress."""
|
|
if task_id not in _progress:
|
|
raise HTTPException(404, "Task not found")
|
|
|
|
async def event_stream():
|
|
while True:
|
|
data = _progress.get(task_id, {})
|
|
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
|
|
if data.get("status") in ("completed", "failed"):
|
|
break
|
|
await asyncio.sleep(1)
|
|
# Clean up after a delay
|
|
await asyncio.sleep(30)
|
|
_progress.pop(task_id, None)
|
|
|
|
return StreamingResponse(event_stream(), media_type="text/event-stream")
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/api/cases")
|
|
async def list_cases():
|
|
"""List existing cases for the dropdown."""
|
|
cases = await db.list_cases()
|
|
return [
|
|
{
|
|
"case_number": c["case_number"],
|
|
"title": c["title"],
|
|
"status": c["status"],
|
|
}
|
|
for c in cases
|
|
]
|
|
|
|
|
|
# ── Paperclip Integration API ─────────────────────────────────────
|
|
|
|
|
|
class CaseCreateRequest(BaseModel):
|
|
case_number: str
|
|
title: str
|
|
appellants: list[str] | None = None
|
|
respondents: list[str] | None = None
|
|
subject: str = ""
|
|
property_address: str = ""
|
|
permit_number: str = ""
|
|
committee_type: str = "ועדה מקומית"
|
|
hearing_date: str = ""
|
|
notes: str = ""
|
|
expected_outcome: str = ""
|
|
|
|
|
|
class CaseUpdateRequest(BaseModel):
|
|
status: str = ""
|
|
title: str = ""
|
|
subject: str = ""
|
|
notes: str = ""
|
|
hearing_date: str = ""
|
|
decision_date: str = ""
|
|
tags: list[str] | None = None
|
|
expected_outcome: str = ""
|
|
|
|
|
|
@app.post("/api/cases/create")
|
|
async def api_case_create(req: CaseCreateRequest):
|
|
"""Create a new appeal case."""
|
|
result = await cases_tools.case_create(
|
|
case_number=req.case_number,
|
|
title=req.title,
|
|
appellants=req.appellants,
|
|
respondents=req.respondents,
|
|
subject=req.subject,
|
|
property_address=req.property_address,
|
|
permit_number=req.permit_number,
|
|
committee_type=req.committee_type,
|
|
hearing_date=req.hearing_date,
|
|
notes=req.notes,
|
|
expected_outcome=req.expected_outcome,
|
|
)
|
|
return json.loads(result)
|
|
|
|
|
|
@app.get("/api/cases/{case_number}/details")
|
|
async def api_case_get(case_number: str):
|
|
"""Get full case details including documents."""
|
|
result = await cases_tools.case_get(case_number)
|
|
try:
|
|
return json.loads(result)
|
|
except json.JSONDecodeError:
|
|
raise HTTPException(404, result)
|
|
|
|
|
|
@app.put("/api/cases/{case_number}")
|
|
async def api_case_update(case_number: str, req: CaseUpdateRequest):
|
|
"""Update case details."""
|
|
result = await cases_tools.case_update(
|
|
case_number=case_number,
|
|
status=req.status,
|
|
title=req.title,
|
|
subject=req.subject,
|
|
notes=req.notes,
|
|
hearing_date=req.hearing_date,
|
|
decision_date=req.decision_date,
|
|
tags=req.tags,
|
|
expected_outcome=req.expected_outcome,
|
|
)
|
|
try:
|
|
return json.loads(result)
|
|
except json.JSONDecodeError:
|
|
raise HTTPException(404, result)
|
|
|
|
|
|
@app.get("/api/cases/{case_number}/status")
|
|
async def api_case_status(case_number: str):
|
|
"""Get full workflow status for a case."""
|
|
result = await workflow_tools.workflow_status(case_number)
|
|
try:
|
|
return json.loads(result)
|
|
except json.JSONDecodeError:
|
|
raise HTTPException(404, result)
|
|
|
|
|
|
@app.get("/api/search")
|
|
async def api_search(query: str, limit: int = 10, section_type: str = ""):
|
|
"""Semantic search across decisions and documents."""
|
|
result = await search_tools.search_decisions(query, limit, section_type)
|
|
try:
|
|
return json.loads(result)
|
|
except json.JSONDecodeError:
|
|
return {"message": result}
|
|
|
|
|
|
@app.get("/api/cases/{case_number}/search")
|
|
async def api_case_search(case_number: str, query: str, limit: int = 10):
|
|
"""Semantic search within a specific case's documents."""
|
|
result = await search_tools.search_case_documents(case_number, query, limit)
|
|
try:
|
|
return json.loads(result)
|
|
except json.JSONDecodeError:
|
|
return {"message": result}
|
|
|
|
|
|
@app.get("/api/cases/{case_number}/template")
|
|
async def api_case_template(case_number: str):
|
|
"""Get outcome-aware decision template for a case."""
|
|
result = await drafting_tools.get_decision_template(case_number)
|
|
if result.startswith("תיק"):
|
|
raise HTTPException(404, result)
|
|
return {"template": result}
|
|
|
|
|
|
@app.get("/api/processing-status")
|
|
async def api_processing_status():
|
|
"""Get overall processing status."""
|
|
result = await workflow_tools.processing_status()
|
|
return json.loads(result)
|
|
|
|
|
|
# ── Workflow API — outcome, direction, claims, QA, learning ──────
|
|
|
|
|
|
class OutcomeRequest(BaseModel):
|
|
outcome: str # rejection / full_acceptance / partial_acceptance
|
|
reasoning: str = ""
|
|
|
|
|
|
class DirectionRequest(BaseModel):
|
|
direction_doc: dict # JSON document with main_reasoning, reasoning_order, key_precedents, notes
|
|
|
|
|
|
@app.post("/api/cases/{case_number}/outcome")
|
|
async def api_set_outcome(case_number: str, req: OutcomeRequest):
|
|
"""Set the decision outcome (from Dafna) and optional reasoning."""
|
|
case = await db.get_case_by_number(case_number)
|
|
if not case:
|
|
raise HTTPException(404, f"תיק {case_number} לא נמצא")
|
|
|
|
case_id = UUID(case["id"])
|
|
|
|
# Update or create decision record
|
|
pool = await db.get_pool()
|
|
async with pool.acquire() as conn:
|
|
existing = await conn.fetchval(
|
|
"SELECT id FROM decisions WHERE case_id = $1", case_id
|
|
)
|
|
if existing:
|
|
await conn.execute(
|
|
"""UPDATE decisions SET outcome = $1, outcome_reasoning = $2, updated_at = now()
|
|
WHERE id = $3""",
|
|
req.outcome, req.reasoning, existing,
|
|
)
|
|
else:
|
|
await conn.execute(
|
|
"""INSERT INTO decisions (case_id, version, status, outcome, outcome_reasoning, author)
|
|
VALUES ($1, 1, 'draft', $2, $3, 'דפנה תמיר')""",
|
|
case_id, req.outcome, req.reasoning,
|
|
)
|
|
|
|
# Update case status
|
|
new_status = "direction_approved" if req.reasoning else "outcome_set"
|
|
await conn.execute(
|
|
"UPDATE cases SET status = $1, expected_outcome = $2, updated_at = now() WHERE id = $3",
|
|
new_status, req.outcome, case_id,
|
|
)
|
|
|
|
return {"status": new_status, "outcome": req.outcome, "has_reasoning": bool(req.reasoning)}
|
|
|
|
|
|
@app.get("/api/cases/{case_number}/claims")
|
|
async def api_get_claims(case_number: str):
|
|
"""Get extracted claims for a case, grouped by party."""
|
|
case = await db.get_case_by_number(case_number)
|
|
if not case:
|
|
raise HTTPException(404, f"תיק {case_number} לא נמצא")
|
|
|
|
pool = await db.get_pool()
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT party_role, claim_text, claim_index, source_document, addressed_in_paragraph
|
|
FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index""",
|
|
UUID(case["id"]),
|
|
)
|
|
|
|
claims_by_party = {}
|
|
for r in rows:
|
|
role = r["party_role"]
|
|
if role not in claims_by_party:
|
|
claims_by_party[role] = []
|
|
claims_by_party[role].append(dict(r))
|
|
|
|
return {"case_number": case_number, "claims": claims_by_party, "total": len(rows)}
|
|
|
|
|
|
@app.post("/api/cases/{case_number}/direction")
|
|
async def api_set_direction(case_number: str, req: DirectionRequest):
|
|
"""Save the approved direction document for the discussion block."""
|
|
case = await db.get_case_by_number(case_number)
|
|
if not case:
|
|
raise HTTPException(404, f"תיק {case_number} לא נמצא")
|
|
|
|
pool = await db.get_pool()
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""UPDATE decisions SET direction_doc = $1, updated_at = now()
|
|
WHERE case_id = $2""",
|
|
json.dumps(req.direction_doc, ensure_ascii=False),
|
|
UUID(case["id"]),
|
|
)
|
|
await conn.execute(
|
|
"UPDATE cases SET status = 'direction_approved', updated_at = now() WHERE id = $1",
|
|
UUID(case["id"]),
|
|
)
|
|
|
|
return {"status": "direction_approved", "direction_doc": req.direction_doc}
|
|
|
|
|
|
@app.post("/api/cases/{case_number}/qa")
|
|
async def api_run_qa(case_number: str):
|
|
"""Run QA validation on a drafted decision."""
|
|
case = await db.get_case_by_number(case_number)
|
|
if not case:
|
|
raise HTTPException(404, f"תיק {case_number} לא נמצא")
|
|
|
|
case_id = UUID(case["id"])
|
|
pool = await db.get_pool()
|
|
|
|
async with pool.acquire() as conn:
|
|
decision = await conn.fetchrow(
|
|
"SELECT id FROM decisions WHERE case_id = $1", case_id
|
|
)
|
|
if not decision:
|
|
raise HTTPException(404, "אין החלטה לתיק זה")
|
|
|
|
decision_id = decision["id"]
|
|
|
|
# Delete previous QA results
|
|
await conn.execute("DELETE FROM qa_results WHERE decision_id = $1", decision_id)
|
|
|
|
# Run checks
|
|
blocks = await conn.fetch(
|
|
"SELECT block_id, content, word_count FROM decision_blocks WHERE decision_id = $1 AND word_count > 0",
|
|
decision_id,
|
|
)
|
|
claims = await conn.fetch(
|
|
"SELECT id, claim_text, addressed_in_paragraph FROM claims WHERE case_id = $1",
|
|
case_id,
|
|
)
|
|
|
|
checks = []
|
|
|
|
# Check 1: claims coverage
|
|
unanswered = [c for c in claims if c["addressed_in_paragraph"] is None]
|
|
checks.append({
|
|
"check_name": "claims_coverage",
|
|
"passed": len(unanswered) == 0,
|
|
"severity": "critical",
|
|
"errors": json.dumps([{"claim": c["claim_text"][:80]} for c in unanswered], ensure_ascii=False),
|
|
"details": f"{len(claims) - len(unanswered)}/{len(claims)} טענות נענו",
|
|
})
|
|
|
|
# Check 2: block weights
|
|
total_words = sum(b["word_count"] for b in blocks)
|
|
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
|
|
yod_pct = (yod["word_count"] / total_words * 100) if yod and total_words > 0 else 0
|
|
checks.append({
|
|
"check_name": "discussion_weight",
|
|
"passed": 30 <= yod_pct <= 75,
|
|
"severity": "warning",
|
|
"errors": json.dumps([]),
|
|
"details": f"בלוק דיון: {yod_pct:.1f}% (טווח: 30-75%)",
|
|
})
|
|
|
|
# Check 3: neutral background
|
|
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
|
|
bad_words = ["חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך", "נפשע", "פגום"]
|
|
found_bad = []
|
|
if vav and vav["content"]:
|
|
for word in bad_words:
|
|
if word in vav["content"]:
|
|
found_bad.append(word)
|
|
checks.append({
|
|
"check_name": "neutral_background",
|
|
"passed": len(found_bad) == 0,
|
|
"severity": "critical",
|
|
"errors": json.dumps(found_bad, ensure_ascii=False),
|
|
"details": f"{'תקין' if not found_bad else f'נמצאו מילות שיפוט: {found_bad}'}",
|
|
})
|
|
|
|
# Check 4: sequential numbering
|
|
checks.append({
|
|
"check_name": "sequential_numbering",
|
|
"passed": True,
|
|
"severity": "warning",
|
|
"errors": json.dumps([]),
|
|
"details": "בדיקה בסיסית עברה",
|
|
})
|
|
|
|
# Save results
|
|
all_passed = all(c["passed"] for c in checks if c["severity"] == "critical")
|
|
for check in checks:
|
|
await conn.execute(
|
|
"""INSERT INTO qa_results (decision_id, case_id, check_name, passed, severity, errors, details)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
|
|
decision_id, case_id, check["check_name"], check["passed"],
|
|
check["severity"], check["errors"], check["details"],
|
|
)
|
|
|
|
# Update status
|
|
new_status = "drafted" if all_passed else "qa_review"
|
|
await conn.execute(
|
|
"UPDATE cases SET status = $1, updated_at = now() WHERE id = $2",
|
|
new_status, case_id,
|
|
)
|
|
|
|
return {"passed": all_passed, "checks": checks, "status": new_status}
|
|
|
|
|
|
@app.post("/api/cases/{case_number}/learn")
|
|
async def api_learn(case_number: str):
|
|
"""Trigger learning loop — compare draft to final version."""
|
|
case = await db.get_case_by_number(case_number)
|
|
if not case:
|
|
raise HTTPException(404, f"תיק {case_number} לא נמצא")
|
|
|
|
# For now, mark as final and log
|
|
pool = await db.get_pool()
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"UPDATE cases SET status = 'final', updated_at = now() WHERE id = $1",
|
|
UUID(case["id"]),
|
|
)
|
|
|
|
return {"status": "final", "message": "לולאת למידה הופעלה — גרסה סופית נקלטה"}
|
|
|
|
|
|
@app.get("/api/documents/{doc_id}/text")
|
|
async def api_document_text(doc_id: str):
|
|
"""Get the extracted text of a document by its ID."""
|
|
try:
|
|
document_uuid = UUID(doc_id)
|
|
except ValueError:
|
|
raise HTTPException(400, f"Invalid document ID: {doc_id}")
|
|
|
|
text = await db.get_document_text(document_uuid)
|
|
if not text:
|
|
raise HTTPException(404, f"Document {doc_id} not found or has no text")
|
|
|
|
return {"doc_id": doc_id, "text": text}
|
|
|
|
|
|
# ── Din Leumi Endpoint ────────────────────────────────────────────
|
|
|
|
|
|
class DinLeumiRequest(BaseModel):
|
|
filename: str
|
|
title: str = ""
|
|
|
|
|
|
@app.post("/api/classify-dinleumi")
|
|
async def classify_dinleumi(req: DinLeumiRequest):
|
|
"""Upload a decision to Din Leumi with auto metadata extraction."""
|
|
source = UPLOAD_DIR / req.filename
|
|
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
|
|
raise HTTPException(404, "File not found in uploads")
|
|
|
|
task_id = str(uuid4())
|
|
_progress[task_id] = {"status": "queued", "filename": req.filename}
|
|
|
|
asyncio.create_task(_process_dinleumi_decision(task_id, source, req))
|
|
|
|
return {"task_id": task_id}
|
|
|
|
|
|
# ── Metadata Extraction ──────────────────────────────────────────
|
|
|
|
METADATA_EXTRACTION_PROMPT = """אתה מנתח פסקי דין של בתי דין לעבודה בתחום ביטוח לאומי.
|
|
חלץ את המטאדאטא הבאה מתוך פסק הדין והחזר אותה כ-JSON בלבד:
|
|
|
|
{
|
|
"title": "כותרת תיאורית קצרה של פסק הדין",
|
|
"court": "שם בית המשפט (למשל: בית הדין האזורי לעבודה תל אביב)",
|
|
"decision_date": "YYYY-MM-DD או null אם לא נמצא",
|
|
"case_number": "מספר תיק (למשל: בל 12345-06-20)",
|
|
"judge": "שם השופט/ת",
|
|
"parties_appellant": "שם התובע/מערער",
|
|
"parties_respondent": "שם הנתבע/משיב",
|
|
"topics": ["רשימת נושאים רלוונטיים מתוך הרשימה למטה"],
|
|
"outcome": "accepted/rejected/partial/remanded",
|
|
"summary": "תקציר של 2-3 משפטים"
|
|
}
|
|
|
|
נושאים אפשריים: נכות כללית, נכות מעבודה, תאונת עבודה, דמי לידה, דמי אבטלה, גמלת הבטחת הכנסה, גמלת ניידות, גמלת סיעוד, קצבת זקנה, קצבת שאירים, מילואים, דמי פגיעה, נפגעי פעולות איבה
|
|
|
|
החזר JSON בלבד, ללא טקסט נוסף."""
|
|
|
|
|
|
_anthropic_client: anthropic.Anthropic | None = None
|
|
|
|
|
|
def _get_anthropic() -> anthropic.Anthropic:
|
|
global _anthropic_client
|
|
if _anthropic_client is None:
|
|
_anthropic_client = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY)
|
|
return _anthropic_client
|
|
|
|
|
|
async def _extract_metadata_with_claude(text: str) -> dict:
|
|
"""Extract metadata from decision text using Claude."""
|
|
client = _get_anthropic()
|
|
# Use first ~5000 chars (usually contains all metadata)
|
|
excerpt = text[:5000]
|
|
|
|
message = client.messages.create(
|
|
model="claude-sonnet-4-20250514",
|
|
max_tokens=1024,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": f"{METADATA_EXTRACTION_PROMPT}\n\nפסק הדין:\n{excerpt}",
|
|
}
|
|
],
|
|
)
|
|
|
|
response_text = message.content[0].text.strip()
|
|
# Parse JSON from response (handle potential markdown wrapping)
|
|
if response_text.startswith("```"):
|
|
response_text = response_text.split("```")[1]
|
|
if response_text.startswith("json"):
|
|
response_text = response_text[4:]
|
|
try:
|
|
metadata = json.loads(response_text)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Failed to parse metadata JSON: %s", response_text[:200])
|
|
metadata = {}
|
|
|
|
return metadata
|
|
|
|
|
|
# ── Background Processing ─────────────────────────────────────────
|
|
|
|
|
|
async def _process_file(task_id: str, source: Path, req: ClassifyRequest):
|
|
"""Process a classified file in the background."""
|
|
try:
|
|
if req.category == "case":
|
|
await _process_case_document(task_id, source, req)
|
|
else:
|
|
await _process_training_document(task_id, source, req)
|
|
except Exception as e:
|
|
logger.exception("Processing failed for %s", req.filename)
|
|
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}
|
|
|
|
|
|
async def _process_case_document(task_id: str, source: Path, req: ClassifyRequest):
|
|
"""Process a case document (mirrors documents.document_upload logic)."""
|
|
_progress[task_id] = {"status": "validating", "filename": req.filename}
|
|
|
|
case = await db.get_case_by_number(req.case_number)
|
|
if not case:
|
|
_progress[task_id] = {"status": "failed", "error": f"Case {req.case_number} not found"}
|
|
return
|
|
|
|
case_id = UUID(case["id"])
|
|
title = req.title or source.stem.split("_", 1)[-1] # Remove timestamp prefix
|
|
|
|
# Copy to case directory
|
|
_progress[task_id] = {"status": "copying", "filename": req.filename}
|
|
case_dir = config.CASES_DIR / req.case_number / "documents"
|
|
case_dir.mkdir(parents=True, exist_ok=True)
|
|
# Use original name without timestamp prefix
|
|
original_name = re.sub(r"^\d+_", "", source.name)
|
|
dest = case_dir / original_name
|
|
shutil.copy2(str(source), str(dest))
|
|
|
|
# Create document record
|
|
_progress[task_id] = {"status": "registering", "filename": req.filename}
|
|
doc = await db.create_document(
|
|
case_id=case_id,
|
|
doc_type=req.doc_type,
|
|
title=title,
|
|
file_path=str(dest),
|
|
)
|
|
|
|
# Process (extract → chunk → embed → store)
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
|
|
result = await processor.process_document(UUID(doc["id"]), case_id)
|
|
|
|
# Git commit
|
|
repo_dir = config.CASES_DIR / req.case_number
|
|
if repo_dir.exists():
|
|
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
|
|
doc_type_hebrew = {
|
|
"appeal": "כתב ערר", "response": "תשובה", "decision": "החלטה",
|
|
"reference": "מסמך עזר", "exhibit": "נספח",
|
|
}.get(req.doc_type, req.doc_type)
|
|
subprocess.run(
|
|
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {title}"],
|
|
cwd=repo_dir, capture_output=True,
|
|
env={"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
|
|
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
|
|
"PATH": "/usr/bin:/bin"},
|
|
)
|
|
|
|
# Remove from uploads
|
|
source.unlink(missing_ok=True)
|
|
|
|
_progress[task_id] = {
|
|
"status": "completed",
|
|
"filename": req.filename,
|
|
"result": result,
|
|
"case_number": req.case_number,
|
|
"doc_type": req.doc_type,
|
|
}
|
|
|
|
|
|
async def _process_training_document(task_id: str, source: Path, req: ClassifyRequest):
|
|
"""Process a training document (mirrors documents.document_upload_training logic)."""
|
|
from datetime import date as date_type
|
|
|
|
title = req.title or source.stem.split("_", 1)[-1]
|
|
|
|
# Copy to training directory
|
|
_progress[task_id] = {"status": "copying", "filename": req.filename}
|
|
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
|
|
original_name = re.sub(r"^\d+_", "", source.name)
|
|
dest = config.TRAINING_DIR / original_name
|
|
shutil.copy2(str(source), str(dest))
|
|
|
|
# Extract text
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
|
|
text, page_count = await extractor.extract_text(str(dest))
|
|
|
|
# Parse date
|
|
d_date = None
|
|
if req.decision_date:
|
|
d_date = date_type.fromisoformat(req.decision_date)
|
|
|
|
# Add to style corpus
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "corpus"}
|
|
corpus_id = await db.add_to_style_corpus(
|
|
document_id=None,
|
|
decision_number=req.decision_number,
|
|
decision_date=d_date,
|
|
subject_categories=req.subject_categories,
|
|
full_text=text,
|
|
)
|
|
|
|
# Chunk and embed
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
|
|
chunks = chunker.chunk_document(text)
|
|
|
|
chunk_count = 0
|
|
if chunks:
|
|
doc = await db.create_document(
|
|
case_id=None,
|
|
doc_type="decision",
|
|
title=f"[קורפוס] {title}",
|
|
file_path=str(dest),
|
|
page_count=page_count,
|
|
)
|
|
doc_id = UUID(doc["id"])
|
|
await db.update_document(doc_id, extracted_text=text, extraction_status="completed")
|
|
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"}
|
|
texts = [c.content for c in chunks]
|
|
embs = await embeddings.embed_texts(texts, input_type="document")
|
|
|
|
chunk_dicts = [
|
|
{
|
|
"content": c.content,
|
|
"section_type": c.section_type,
|
|
"embedding": emb,
|
|
"page_number": c.page_number,
|
|
"chunk_index": c.chunk_index,
|
|
}
|
|
for c, emb in zip(chunks, embs)
|
|
]
|
|
await db.store_chunks(doc_id, None, chunk_dicts)
|
|
chunk_count = len(chunks)
|
|
|
|
# Remove from uploads
|
|
source.unlink(missing_ok=True)
|
|
|
|
_progress[task_id] = {
|
|
"status": "completed",
|
|
"filename": req.filename,
|
|
"result": {
|
|
"corpus_id": str(corpus_id),
|
|
"title": title,
|
|
"pages": page_count,
|
|
"text_length": len(text),
|
|
"chunks": chunk_count,
|
|
},
|
|
}
|
|
|
|
|
|
async def _process_dinleumi_decision(task_id: str, source: Path, req: DinLeumiRequest):
|
|
"""Process a National Insurance court decision with auto metadata extraction."""
|
|
from datetime import date as date_type
|
|
|
|
try:
|
|
# Step 1: Copy to din-leumi decisions directory
|
|
_progress[task_id] = {"status": "copying", "filename": req.filename}
|
|
original_name = re.sub(r"^\d+_", "", source.name)
|
|
dest = dl_config.DECISIONS_DIR / original_name
|
|
if dest.exists():
|
|
dest = dl_config.DECISIONS_DIR / f"{dest.stem}_{int(time.time())}{dest.suffix}"
|
|
shutil.copy2(str(source), str(dest))
|
|
|
|
# Step 2: Extract text
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
|
|
text, page_count = await dl_extractor.extract_text(str(dest))
|
|
|
|
# Step 3: Extract metadata with Claude
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting_metadata"}
|
|
metadata = await _extract_metadata_with_claude(text)
|
|
|
|
# Parse date
|
|
d_date = None
|
|
if metadata.get("decision_date"):
|
|
try:
|
|
d_date = date_type.fromisoformat(metadata["decision_date"])
|
|
except (ValueError, TypeError):
|
|
d_date = None
|
|
|
|
title = req.title or metadata.get("title", original_name.rsplit(".", 1)[0])
|
|
|
|
# Step 4: Create decision record
|
|
_progress[task_id] = {"status": "registering", "filename": req.filename}
|
|
decision = await dl_db.create_decision(
|
|
title=title,
|
|
file_path=str(dest),
|
|
court=metadata.get("court", ""),
|
|
decision_date=d_date,
|
|
case_number=metadata.get("case_number", ""),
|
|
judge=metadata.get("judge", ""),
|
|
parties_appellant=metadata.get("parties_appellant", ""),
|
|
parties_respondent=metadata.get("parties_respondent", "המוסד לביטוח לאומי"),
|
|
topics=metadata.get("topics"),
|
|
outcome=metadata.get("outcome", ""),
|
|
)
|
|
|
|
decision_id = UUID(decision["id"])
|
|
|
|
# Update with extracted text
|
|
await dl_db.update_decision(
|
|
decision_id,
|
|
extracted_text=text,
|
|
page_count=page_count,
|
|
summary=metadata.get("summary", ""),
|
|
)
|
|
|
|
# Step 5: Chunk
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
|
|
from din_leumi.services import chunker as dl_chunker, embeddings as dl_embeddings
|
|
chunks = dl_chunker.chunk_document(text)
|
|
|
|
chunk_count = 0
|
|
if chunks:
|
|
# Step 6: Embed
|
|
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"}
|
|
texts = [c.content for c in chunks]
|
|
embs = await dl_embeddings.embed_texts(texts, input_type="document")
|
|
|
|
chunk_dicts = [
|
|
{
|
|
"content": c.content,
|
|
"section_type": c.section_type,
|
|
"embedding": emb,
|
|
"page_number": c.page_number,
|
|
"chunk_index": c.chunk_index,
|
|
}
|
|
for c, emb in zip(chunks, embs)
|
|
]
|
|
await dl_db.store_chunks(decision_id, chunk_dicts)
|
|
chunk_count = len(chunks)
|
|
|
|
await dl_db.update_decision(decision_id, extraction_status="completed")
|
|
await dl_db.ensure_ivfflat_index()
|
|
|
|
# Remove from uploads
|
|
source.unlink(missing_ok=True)
|
|
|
|
_progress[task_id] = {
|
|
"status": "completed",
|
|
"filename": req.filename,
|
|
"system": "din-leumi",
|
|
"result": {
|
|
"decision_id": str(decision_id),
|
|
"title": title,
|
|
"pages": page_count,
|
|
"text_length": len(text),
|
|
"chunks": chunk_count,
|
|
},
|
|
"metadata": {
|
|
"court": metadata.get("court", ""),
|
|
"judge": metadata.get("judge", ""),
|
|
"case_number": metadata.get("case_number", ""),
|
|
"decision_date": metadata.get("decision_date", ""),
|
|
"outcome": metadata.get("outcome", ""),
|
|
"topics": metadata.get("topics", []),
|
|
"summary": metadata.get("summary", ""),
|
|
},
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception("Din Leumi processing failed for %s", req.filename)
|
|
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}
|