Files
legal-ai/web/app.py
Chaim 5fc52ce530 Switch to cases/{new,in-progress,completed}/ directory structure
Replace single CASES_DIR with find_case_dir() that searches across
all status directories. New cases created in cases/new/{number}/.

Config: CASES_BASE, CASES_NEW, CASES_IN_PROGRESS, CASES_COMPLETED
Docker: added -v /home/chaim/legal-ai/cases:/cases volume mount

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 10:45:47 +00:00

909 lines
31 KiB
Python

"""Ezer Mishpati — Web upload interface for legal documents."""
from __future__ import annotations
import asyncio
import json
import logging
import re
import shutil
import subprocess
import sys
import time
from contextlib import asynccontextmanager
from pathlib import Path
from uuid import UUID, uuid4
# Allow importing legal_mcp from the MCP server source
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, processor
from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools
# Import integration clients (same directory)
_web_dir = Path(__file__).resolve().parent
sys.path.insert(0, str(_web_dir.parent))
from web.gitea_client import create_repo, setup_remote_and_push
from web.paperclip_client import create_project as pc_create_project, get_project_url
logger = logging.getLogger(__name__)
UPLOAD_DIR = config.DATA_DIR / "uploads"
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".rtf", ".txt"}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
# In-memory progress tracking
_progress: dict[str, dict] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
await db.init_schema()
yield
await db.close_pool()
app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan)
STATIC_DIR = Path(__file__).parent / "static"
# ── API Endpoints ──────────────────────────────────────────────────
@app.get("/")
async def index():
return FileResponse(STATIC_DIR / "index.html")
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
"""Upload a file to the temporary uploads directory."""
if not file.filename:
raise HTTPException(400, "No filename provided")
# Validate extension
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"Unsupported file type: {ext}. Allowed: {', '.join(ALLOWED_EXTENSIONS)}")
# Sanitize filename
safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem)
if not safe_name:
safe_name = "document"
timestamp = int(time.time())
filename = f"{timestamp}_{safe_name}{ext}"
# Read and validate size
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB")
dest = UPLOAD_DIR / filename
dest.write_bytes(content)
return {
"filename": filename,
"original_name": file.filename,
"size": len(content),
}
@app.get("/api/uploads")
async def list_uploads():
"""List files in the uploads (pending) directory."""
if not UPLOAD_DIR.exists():
return []
files = []
for f in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
if f.is_file() and f.suffix.lower() in ALLOWED_EXTENSIONS:
stat = f.stat()
files.append({
"filename": f.name,
"size": stat.st_size,
"uploaded_at": stat.st_mtime,
})
return files
@app.delete("/api/uploads/{filename}")
async def delete_upload(filename: str):
"""Remove a file from the uploads directory."""
path = UPLOAD_DIR / filename
if not path.exists() or not path.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found")
path.unlink()
return {"deleted": filename}
class ClassifyRequest(BaseModel):
filename: str
category: str # "training" or "case"
# For case documents
case_number: str = ""
doc_type: str = "appeal"
title: str = ""
# For training documents
decision_number: str = ""
decision_date: str = ""
subject_categories: list[str] = []
@app.post("/api/classify")
async def classify_file(req: ClassifyRequest):
"""Classify a pending file and start processing."""
source = UPLOAD_DIR / req.filename
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found in uploads")
if req.category not in ("training", "case"):
raise HTTPException(400, "Category must be 'training' or 'case'")
if req.category == "case" and not req.case_number:
raise HTTPException(400, "case_number required for case documents")
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": req.filename}
asyncio.create_task(_process_file(task_id, source, req))
return {"task_id": task_id}
@app.get("/api/progress/{task_id}")
async def progress_stream(task_id: str):
"""SSE stream of processing progress."""
if task_id not in _progress:
raise HTTPException(404, "Task not found")
async def event_stream():
while True:
data = _progress.get(task_id, {})
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
if data.get("status") in ("completed", "failed"):
break
await asyncio.sleep(1)
# Clean up after a delay
await asyncio.sleep(30)
_progress.pop(task_id, None)
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/api/cases")
async def list_cases(detail: bool = False):
"""List existing cases. With detail=true, includes doc counts and integration URLs."""
cases = await db.list_cases()
if not detail:
return [
{"case_number": c["case_number"], "title": c["title"], "status": c["status"]}
for c in cases
]
# Enhanced listing with document counts
pool = await db.get_pool()
result = []
async with pool.acquire() as conn:
for c in cases:
case_id = UUID(c["id"])
doc_count = await conn.fetchval(
"SELECT count(*) FROM documents WHERE case_id = $1", case_id
)
result.append({
"case_number": c["case_number"],
"title": c["title"],
"status": c["status"],
"expected_outcome": c.get("expected_outcome", ""),
"committee_type": c.get("committee_type", ""),
"hearing_date": str(c["hearing_date"]) if c.get("hearing_date") else "",
"document_count": doc_count,
"gitea_url": f"https://gitea.nautilus.marcusgroup.org/cases/{c['case_number']}",
})
return result
# ── Paperclip Integration API ─────────────────────────────────────
class CaseCreateRequest(BaseModel):
case_number: str
title: str
appellants: list[str] | None = None
respondents: list[str] | None = None
subject: str = ""
property_address: str = ""
permit_number: str = ""
committee_type: str = "ועדה מקומית"
hearing_date: str = ""
notes: str = ""
expected_outcome: str = ""
class CaseUpdateRequest(BaseModel):
status: str = ""
title: str = ""
subject: str = ""
notes: str = ""
hearing_date: str = ""
decision_date: str = ""
tags: list[str] | None = None
expected_outcome: str = ""
@app.post("/api/cases/create")
async def api_case_create(req: CaseCreateRequest):
"""Create a new appeal case."""
result = await cases_tools.case_create(
case_number=req.case_number,
title=req.title,
appellants=req.appellants,
respondents=req.respondents,
subject=req.subject,
property_address=req.property_address,
permit_number=req.permit_number,
committee_type=req.committee_type,
hearing_date=req.hearing_date,
notes=req.notes,
expected_outcome=req.expected_outcome,
)
return json.loads(result)
@app.get("/api/cases/{case_number}/details")
async def api_case_get(case_number: str):
"""Get full case details including documents."""
result = await cases_tools.case_get(case_number)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.put("/api/cases/{case_number}")
async def api_case_update(case_number: str, req: CaseUpdateRequest):
"""Update case details."""
result = await cases_tools.case_update(
case_number=case_number,
status=req.status,
title=req.title,
subject=req.subject,
notes=req.notes,
hearing_date=req.hearing_date,
decision_date=req.decision_date,
tags=req.tags,
expected_outcome=req.expected_outcome,
)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.get("/api/cases/{case_number}/status")
async def api_case_status(case_number: str):
"""Get full workflow status for a case."""
result = await workflow_tools.workflow_status(case_number)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.get("/api/search")
async def api_search(query: str, limit: int = 10, section_type: str = ""):
"""Semantic search across decisions and documents."""
result = await search_tools.search_decisions(query, limit, section_type)
try:
return json.loads(result)
except json.JSONDecodeError:
return {"message": result}
@app.get("/api/cases/{case_number}/search")
async def api_case_search(case_number: str, query: str, limit: int = 10):
"""Semantic search within a specific case's documents."""
result = await search_tools.search_case_documents(case_number, query, limit)
try:
return json.loads(result)
except json.JSONDecodeError:
return {"message": result}
@app.get("/api/cases/{case_number}/template")
async def api_case_template(case_number: str):
"""Get outcome-aware decision template for a case."""
result = await drafting_tools.get_decision_template(case_number)
if result.startswith("תיק"):
raise HTTPException(404, result)
return {"template": result}
@app.get("/api/processing-status")
async def api_processing_status():
"""Get overall processing status."""
result = await workflow_tools.processing_status()
return json.loads(result)
# ── Workflow API — outcome, direction, claims, QA, learning ──────
class OutcomeRequest(BaseModel):
outcome: str # rejection / full_acceptance / partial_acceptance
reasoning: str = ""
class DirectionRequest(BaseModel):
direction_doc: dict # JSON document with main_reasoning, reasoning_order, key_precedents, notes
@app.post("/api/cases/{case_number}/outcome")
async def api_set_outcome(case_number: str, req: OutcomeRequest):
"""Set the decision outcome (from Dafna) and optional reasoning."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
# Update or create decision record
pool = await db.get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchval(
"SELECT id FROM decisions WHERE case_id = $1", case_id
)
if existing:
await conn.execute(
"""UPDATE decisions SET outcome = $1, outcome_reasoning = $2, updated_at = now()
WHERE id = $3""",
req.outcome, req.reasoning, existing,
)
else:
await conn.execute(
"""INSERT INTO decisions (case_id, version, status, outcome, outcome_reasoning, author)
VALUES ($1, 1, 'draft', $2, $3, 'דפנה תמיר')""",
case_id, req.outcome, req.reasoning,
)
# Update case status
new_status = "direction_approved" if req.reasoning else "outcome_set"
await conn.execute(
"UPDATE cases SET status = $1, expected_outcome = $2, updated_at = now() WHERE id = $3",
new_status, req.outcome, case_id,
)
return {"status": new_status, "outcome": req.outcome, "has_reasoning": bool(req.reasoning)}
@app.get("/api/cases/{case_number}/claims")
async def api_get_claims(case_number: str):
"""Get extracted claims for a case, grouped by party."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT party_role, claim_text, claim_index, source_document, addressed_in_paragraph
FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index""",
UUID(case["id"]),
)
claims_by_party = {}
for r in rows:
role = r["party_role"]
if role not in claims_by_party:
claims_by_party[role] = []
claims_by_party[role].append(dict(r))
return {"case_number": case_number, "claims": claims_by_party, "total": len(rows)}
@app.post("/api/cases/{case_number}/direction")
async def api_set_direction(case_number: str, req: DirectionRequest):
"""Save the approved direction document for the discussion block."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""UPDATE decisions SET direction_doc = $1, updated_at = now()
WHERE case_id = $2""",
json.dumps(req.direction_doc, ensure_ascii=False),
UUID(case["id"]),
)
await conn.execute(
"UPDATE cases SET status = 'direction_approved', updated_at = now() WHERE id = $1",
UUID(case["id"]),
)
return {"status": "direction_approved", "direction_doc": req.direction_doc}
@app.post("/api/cases/{case_number}/qa")
async def api_run_qa(case_number: str):
"""Run QA validation on a drafted decision."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
pool = await db.get_pool()
async with pool.acquire() as conn:
decision = await conn.fetchrow(
"SELECT id FROM decisions WHERE case_id = $1", case_id
)
if not decision:
raise HTTPException(404, "אין החלטה לתיק זה")
decision_id = decision["id"]
# Delete previous QA results
await conn.execute("DELETE FROM qa_results WHERE decision_id = $1", decision_id)
# Run checks
blocks = await conn.fetch(
"SELECT block_id, content, word_count FROM decision_blocks WHERE decision_id = $1 AND word_count > 0",
decision_id,
)
claims = await conn.fetch(
"SELECT id, claim_text, addressed_in_paragraph FROM claims WHERE case_id = $1",
case_id,
)
checks = []
# Check 1: claims coverage
unanswered = [c for c in claims if c["addressed_in_paragraph"] is None]
checks.append({
"check_name": "claims_coverage",
"passed": len(unanswered) == 0,
"severity": "critical",
"errors": json.dumps([{"claim": c["claim_text"][:80]} for c in unanswered], ensure_ascii=False),
"details": f"{len(claims) - len(unanswered)}/{len(claims)} טענות נענו",
})
# Check 2: block weights
total_words = sum(b["word_count"] for b in blocks)
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
yod_pct = (yod["word_count"] / total_words * 100) if yod and total_words > 0 else 0
checks.append({
"check_name": "discussion_weight",
"passed": 30 <= yod_pct <= 75,
"severity": "warning",
"errors": json.dumps([]),
"details": f"בלוק דיון: {yod_pct:.1f}% (טווח: 30-75%)",
})
# Check 3: neutral background
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
bad_words = ["חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך", "נפשע", "פגום"]
found_bad = []
if vav and vav["content"]:
for word in bad_words:
if word in vav["content"]:
found_bad.append(word)
checks.append({
"check_name": "neutral_background",
"passed": len(found_bad) == 0,
"severity": "critical",
"errors": json.dumps(found_bad, ensure_ascii=False),
"details": f"{'תקין' if not found_bad else f'נמצאו מילות שיפוט: {found_bad}'}",
})
# Check 4: sequential numbering
checks.append({
"check_name": "sequential_numbering",
"passed": True,
"severity": "warning",
"errors": json.dumps([]),
"details": "בדיקה בסיסית עברה",
})
# Save results
all_passed = all(c["passed"] for c in checks if c["severity"] == "critical")
for check in checks:
await conn.execute(
"""INSERT INTO qa_results (decision_id, case_id, check_name, passed, severity, errors, details)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
decision_id, case_id, check["check_name"], check["passed"],
check["severity"], check["errors"], check["details"],
)
# Update status
new_status = "drafted" if all_passed else "qa_review"
await conn.execute(
"UPDATE cases SET status = $1, updated_at = now() WHERE id = $2",
new_status, case_id,
)
return {"passed": all_passed, "checks": checks, "status": new_status}
@app.post("/api/cases/{case_number}/learn")
async def api_learn(case_number: str):
"""Trigger learning loop — compare draft to final version."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
# For now, mark as final and log
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET status = 'final', updated_at = now() WHERE id = $1",
UUID(case["id"]),
)
return {"status": "final", "message": "לולאת למידה הופעלה — גרסה סופית נקלטה"}
@app.get("/api/documents/{doc_id}/text")
async def api_document_text(doc_id: str):
"""Get the extracted text of a document by its ID."""
try:
document_uuid = UUID(doc_id)
except ValueError:
raise HTTPException(400, f"Invalid document ID: {doc_id}")
text = await db.get_document_text(document_uuid)
if not text:
raise HTTPException(404, f"Document {doc_id} not found or has no text")
return {"doc_id": doc_id, "text": text}
# ── Integration Endpoints — Gitea & Paperclip ────────────────────
DOC_TYPE_NAMES = {
"appeal": "כתב-ערר",
"response": "תשובת",
"protocol": "פרוטוקול-דיון",
"plan": "תכנית",
"decision": "החלטה",
"court_decision": "פסק-דין",
"permit": "היתר",
"appraisal": "שומה",
"exhibit": "נספח",
"objection": "התנגדות",
"reference": "מסמך-עזר",
}
def generate_doc_filename(doc_type: str, case_number: str, party_name: str = "", ext: str = ".pdf") -> str:
"""Generate a clear Hebrew filename for a document."""
base = DOC_TYPE_NAMES.get(doc_type, doc_type)
parts = [base]
if party_name:
safe_party = re.sub(r"[^\w\u0590-\u05FF\s]", "", party_name).strip().replace(" ", "-")
parts.append(safe_party)
parts.append(case_number)
return "-".join(parts) + ext
class GiteaRepoRequest(BaseModel):
case_number: str
title: str
description: str = ""
@app.post("/api/integrations/gitea/create-repo")
async def api_gitea_create_repo(req: GiteaRepoRequest):
"""Create a Gitea repo in the 'cases' org and link it to the local case directory."""
try:
repo = await create_repo(req.case_number, req.title, req.description)
except Exception as e:
raise HTTPException(502, f"Gitea error: {e}")
clone_url = repo.get("clone_url") or repo.get("html_url", "")
case_dir = config.find_case_dir(req.case_number)
pushed = False
if case_dir.exists():
pushed = setup_remote_and_push(case_dir, clone_url)
return {
"repo_url": repo.get("html_url", ""),
"clone_url": clone_url,
"pushed": pushed,
}
class PaperclipProjectRequest(BaseModel):
case_number: str
title: str
description: str = ""
appeal_type: str = "רישוי"
@app.post("/api/integrations/paperclip/create-project")
async def api_paperclip_create_project(req: PaperclipProjectRequest):
"""Create a project in Paperclip's embedded DB."""
try:
project = await pc_create_project(
case_number=req.case_number,
title=req.title,
description=req.description,
appeal_type=req.appeal_type,
)
except Exception as e:
raise HTTPException(502, f"Paperclip error: {e}")
return project
@app.post("/api/cases/{case_number}/documents/upload-tagged")
async def api_upload_tagged_document(
case_number: str,
file: UploadFile = File(...),
doc_type: str = Form("auto"),
party_name: str = Form(""),
title: str = Form(""),
):
"""Upload a document to a case with tagging and auto-rename."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
if not file.filename:
raise HTTPException(400, "No filename provided")
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"סוג קובץ לא נתמך: {ext}")
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"קובץ גדול מדי. מקסימום: {MAX_FILE_SIZE // (1024*1024)}MB")
# Generate smart filename
new_filename = generate_doc_filename(doc_type, case_number, party_name, ext)
# Save to case directory
case_dir = config.find_case_dir(case_number) / "documents"
case_dir.mkdir(parents=True, exist_ok=True)
dest = case_dir / new_filename
# Handle duplicates
counter = 1
while dest.exists():
stem = new_filename.rsplit(".", 1)[0]
dest = case_dir / f"{stem}-{counter}{ext}"
counter += 1
dest.write_bytes(content)
# Create document record
case_id = UUID(case["id"])
doc_title = title or new_filename.rsplit(".", 1)[0].replace("-", " ")
doc = await db.create_document(
case_id=case_id,
doc_type=doc_type if doc_type != "auto" else "reference",
title=doc_title,
file_path=str(dest),
)
# Process in background
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": new_filename}
asyncio.create_task(_process_tagged_document(task_id, dest, case_number, case_id, UUID(doc["id"]), doc_type, new_filename))
return {
"task_id": task_id,
"filename": new_filename,
"original_name": file.filename,
"doc_type": doc_type,
}
async def _process_tagged_document(task_id: str, dest: Path, case_number: str, case_id: UUID, doc_id: UUID, doc_type: str, display_name: str):
"""Process an uploaded tagged document in the background."""
try:
_progress[task_id] = {"status": "processing", "filename": display_name, "step": "extracting"}
result = await processor.process_document(doc_id, case_id)
# Git commit + push
repo_dir = config.find_case_dir(case_number)
if repo_dir.exists():
env = {
"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
"PATH": "/usr/bin:/bin",
}
doc_type_hebrew = DOC_TYPE_NAMES.get(doc_type, doc_type)
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
subprocess.run(
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {display_name}"],
cwd=repo_dir, capture_output=True, env=env,
)
# Try to push to Gitea (non-blocking)
subprocess.run(["git", "push"], cwd=repo_dir, capture_output=True, env={
**env,
"GIT_TERMINAL_PROMPT": "0",
})
_progress[task_id] = {
"status": "completed",
"filename": display_name,
"result": result,
"case_number": case_number,
"doc_type": doc_type,
}
except Exception as e:
logger.exception("Processing failed for %s", display_name)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": display_name}
# ── Background Processing ─────────────────────────────────────────
async def _process_file(task_id: str, source: Path, req: ClassifyRequest):
"""Process a classified file in the background."""
try:
if req.category == "case":
await _process_case_document(task_id, source, req)
else:
await _process_training_document(task_id, source, req)
except Exception as e:
logger.exception("Processing failed for %s", req.filename)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}
async def _process_case_document(task_id: str, source: Path, req: ClassifyRequest):
"""Process a case document (mirrors documents.document_upload logic)."""
_progress[task_id] = {"status": "validating", "filename": req.filename}
case = await db.get_case_by_number(req.case_number)
if not case:
_progress[task_id] = {"status": "failed", "error": f"Case {req.case_number} not found"}
return
case_id = UUID(case["id"])
title = req.title or source.stem.split("_", 1)[-1] # Remove timestamp prefix
# Copy to case directory
_progress[task_id] = {"status": "copying", "filename": req.filename}
case_dir = config.find_case_dir(req.case_number) / "documents"
case_dir.mkdir(parents=True, exist_ok=True)
# Use original name without timestamp prefix
original_name = re.sub(r"^\d+_", "", source.name)
dest = case_dir / original_name
shutil.copy2(str(source), str(dest))
# Create document record
_progress[task_id] = {"status": "registering", "filename": req.filename}
doc = await db.create_document(
case_id=case_id,
doc_type=req.doc_type,
title=title,
file_path=str(dest),
)
# Process (extract → chunk → embed → store)
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
result = await processor.process_document(UUID(doc["id"]), case_id)
# Git commit
repo_dir = config.find_case_dir(req.case_number)
if repo_dir.exists():
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
doc_type_hebrew = {
"appeal": "כתב ערר", "response": "תשובה", "decision": "החלטה",
"reference": "מסמך עזר", "exhibit": "נספח",
}.get(req.doc_type, req.doc_type)
subprocess.run(
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {title}"],
cwd=repo_dir, capture_output=True,
env={"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
"PATH": "/usr/bin:/bin"},
)
# Remove from uploads
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"result": result,
"case_number": req.case_number,
"doc_type": req.doc_type,
}
async def _process_training_document(task_id: str, source: Path, req: ClassifyRequest):
"""Process a training document (mirrors documents.document_upload_training logic)."""
from datetime import date as date_type
title = req.title or source.stem.split("_", 1)[-1]
# Copy to training directory
_progress[task_id] = {"status": "copying", "filename": req.filename}
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
original_name = re.sub(r"^\d+_", "", source.name)
dest = config.TRAINING_DIR / original_name
shutil.copy2(str(source), str(dest))
# Extract text
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
text, page_count = await extractor.extract_text(str(dest))
# Parse date
d_date = None
if req.decision_date:
d_date = date_type.fromisoformat(req.decision_date)
# Add to style corpus
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "corpus"}
corpus_id = await db.add_to_style_corpus(
document_id=None,
decision_number=req.decision_number,
decision_date=d_date,
subject_categories=req.subject_categories,
full_text=text,
)
# Chunk and embed
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
chunks = chunker.chunk_document(text)
chunk_count = 0
if chunks:
doc = await db.create_document(
case_id=None,
doc_type="decision",
title=f"[קורפוס] {title}",
file_path=str(dest),
page_count=page_count,
)
doc_id = UUID(doc["id"])
await db.update_document(doc_id, extracted_text=text, extraction_status="completed")
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"}
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await db.store_chunks(doc_id, None, chunk_dicts)
chunk_count = len(chunks)
# Remove from uploads
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"result": {
"corpus_id": str(corpus_id),
"title": title,
"pages": page_count,
"text_length": len(text),
"chunks": chunk_count,
},
}