Files
legal-ai/web/app.py
Chaim 140a2e442d
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 32s
Add drafts & feedback tab to case page, remove global feedback page
Move draft management (export DOCX, download, upload revised version, mark
final) and chair feedback into a new "טיוטות והערות" tab on the case detail
page. Remove the standalone /feedback page and its nav link since feedback
is now case-scoped.

Also fix /api/admin/skills 500 error when Paperclip DB is unreachable by
adding a connection timeout and graceful fallback to disk-only skills.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 05:55:46 +00:00

2866 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Ezer Mishpati — Web upload interface for legal documents."""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from contextlib import asynccontextmanager
from pathlib import Path
from uuid import UUID, uuid4
# Allow importing legal_mcp from the MCP server source
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
import zipfile
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import asyncpg
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, processor, proofreader, research_md
from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools, precedents as precedents_tools
# Import integration clients (same directory)
_web_dir = Path(__file__).resolve().parent
sys.path.insert(0, str(_web_dir.parent))
from web.gitea_client import create_repo, setup_remote_and_push
from web.paperclip_client import create_project as pc_create_project, get_project_url
logger = logging.getLogger(__name__)
UPLOAD_DIR = config.DATA_DIR / "uploads"
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".rtf", ".txt", ".md"}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
# In-memory progress tracking
_progress: dict[str, dict] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
await db.init_schema()
yield
await db.close_pool()
app = FastAPI(title="העלאת מסמכים משפטיים", lifespan=lifespan)
STATIC_DIR = Path(__file__).parent / "static"
# ── API Endpoints ──────────────────────────────────────────────────
@app.get("/")
async def index():
return FileResponse(STATIC_DIR / "index.html")
@app.get("/design-system.css")
async def design_system_css():
return FileResponse(STATIC_DIR / "design-system.css", media_type="text/css")
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
"""Upload a file to the temporary uploads directory."""
if not file.filename:
raise HTTPException(400, "No filename provided")
# Validate extension
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"Unsupported file type: {ext}. Allowed: {', '.join(ALLOWED_EXTENSIONS)}")
# Sanitize filename
safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem)
if not safe_name:
safe_name = "document"
timestamp = int(time.time())
filename = f"{timestamp}_{safe_name}{ext}"
# Read and validate size
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB")
dest = UPLOAD_DIR / filename
dest.write_bytes(content)
return {
"filename": filename,
"original_name": file.filename,
"size": len(content),
}
@app.get("/api/uploads")
async def list_uploads():
"""List files in the uploads (pending) directory."""
if not UPLOAD_DIR.exists():
return []
files = []
for f in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
if f.is_file() and f.suffix.lower() in ALLOWED_EXTENSIONS:
stat = f.stat()
files.append({
"filename": f.name,
"size": stat.st_size,
"uploaded_at": stat.st_mtime,
})
return files
@app.delete("/api/uploads/{filename}")
async def delete_upload(filename: str):
"""Remove a file from the uploads directory."""
path = UPLOAD_DIR / filename
if not path.exists() or not path.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found")
path.unlink()
return {"deleted": filename}
class ClassifyRequest(BaseModel):
filename: str
category: str # "training" or "case"
# For case documents
case_number: str = ""
doc_type: str = "appeal"
title: str = ""
# For training documents
decision_number: str = ""
decision_date: str = ""
subject_categories: list[str] = []
@app.post("/api/classify")
async def classify_file(req: ClassifyRequest):
"""Classify a pending file and start processing."""
source = UPLOAD_DIR / req.filename
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found in uploads")
if req.category not in ("training", "case"):
raise HTTPException(400, "Category must be 'training' or 'case'")
if req.category == "case" and not req.case_number:
raise HTTPException(400, "case_number required for case documents")
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": req.filename}
asyncio.create_task(_process_file(task_id, source, req))
return {"task_id": task_id}
# ── Training Corpus: Analyze & Upload ─────────────────────────────
@app.post("/api/training/analyze")
async def training_analyze(filename: str = Form(...)):
"""Proofread an uploaded file and extract metadata for review.
Input: filename in UPLOAD_DIR (from /api/upload).
Output: clean text preview + extracted metadata (number, date, categories).
"""
source = UPLOAD_DIR / filename
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found in uploads")
try:
result = await proofreader.analyze_file(source)
except Exception as e:
logger.exception("Proofread failed for %s", filename)
raise HTTPException(500, f"Proofreading failed: {e}")
return result
class TrainingUploadRequest(BaseModel):
filename: str # name in UPLOAD_DIR
decision_number: str = ""
decision_date: str = "" # YYYY-MM-DD
subject_categories: list[str] = []
title: str = ""
@app.post("/api/training/upload")
async def training_upload(req: TrainingUploadRequest):
"""Upload a proofread file to the style corpus.
Runs proofreading again to guarantee clean text (not raw file content),
then inserts into style_corpus + chunks + embeddings.
"""
source = UPLOAD_DIR / req.filename
if not source.exists() or not source.parent.samefile(UPLOAD_DIR):
raise HTTPException(404, "File not found in uploads")
# Check for duplicate by decision_number
if req.decision_number:
pool = await db.get_pool()
async with pool.acquire() as conn:
exists = await conn.fetchval(
"SELECT 1 FROM style_corpus WHERE decision_number = $1 LIMIT 1",
req.decision_number,
)
if exists:
raise HTTPException(
409,
f"החלטה {req.decision_number} כבר קיימת בקורפוס",
)
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": req.filename}
asyncio.create_task(_process_proofread_training(task_id, source, req))
return {"task_id": task_id}
async def _process_proofread_training(
task_id: str, source: Path, req: TrainingUploadRequest
):
"""Background task: proofread → store in corpus → chunk → embed."""
from datetime import date as date_type
try:
title = req.title or source.stem.split("_", 1)[-1]
# 1. Proofread (strip Nevo additions)
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "proofreading"}
clean_text, stats = await proofreader.proofread(source)
# 2. Save proofread .md to training dir (alongside original)
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "saving"}
training_dir = config.TRAINING_DIR
proofread_dir = training_dir / "proofread"
training_dir.mkdir(parents=True, exist_ok=True)
proofread_dir.mkdir(exist_ok=True)
# Copy original to training dir
original_name = re.sub(r"^\d+_", "", source.name)
orig_dest = training_dir / original_name
shutil.copy2(str(source), str(orig_dest))
# Save cleaned version
proofread_name = Path(original_name).stem + ".md"
proofread_dest = proofread_dir / proofread_name
proofread_dest.write_text(clean_text, encoding="utf-8")
# 3. Parse date
d_date = None
if req.decision_date:
d_date = date_type.fromisoformat(req.decision_date)
# 4. Add to style corpus
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "corpus"}
corpus_id = await db.add_to_style_corpus(
document_id=None,
decision_number=req.decision_number,
decision_date=d_date,
subject_categories=req.subject_categories,
full_text=clean_text,
)
# 5. Chunk + embed
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
chunks = chunker.chunk_document(clean_text)
chunk_count = 0
if chunks:
doc = await db.create_document(
case_id=None,
doc_type="decision",
title=f"[קורפוס] {title}",
file_path=str(orig_dest),
page_count=stats.get("pages", 0),
)
doc_id = UUID(doc["id"])
await db.update_document(
doc_id, extracted_text=clean_text, extraction_status="completed"
)
_progress[task_id] = {
"status": "processing", "filename": req.filename, "step": "embedding",
}
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await db.store_chunks(doc_id, None, chunk_dicts)
chunk_count = len(chunks)
# 6. Cleanup upload
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"result": {
"corpus_id": str(corpus_id),
"title": title,
"chars": len(clean_text),
"chunks": chunk_count,
"proofread_stats": stats,
},
}
except Exception as e:
logger.exception("Training upload failed for %s", req.filename)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}
@app.get("/api/training/patterns")
async def training_patterns():
"""List all extracted style patterns, grouped by type."""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT pattern_type, pattern_text, frequency, context, examples "
"FROM style_patterns "
"ORDER BY pattern_type, frequency DESC"
)
grouped: dict[str, list] = {}
for r in rows:
pt = r["pattern_type"]
examples = r["examples"]
if isinstance(examples, str):
try:
examples = json.loads(examples)
except Exception:
examples = []
grouped.setdefault(pt, []).append({
"pattern_text": r["pattern_text"],
"frequency": r["frequency"],
"context": r["context"] or "",
"examples": examples or [],
})
return {"total": len(rows), "by_type": grouped}
_style_analysis_state = {"running": False, "started_at": None, "result": None, "error": None}
@app.post("/api/training/analyze-style")
async def training_analyze_style():
"""Kick off style analysis over the corpus. Returns immediately."""
if _style_analysis_state["running"]:
raise HTTPException(409, "ניתוח סגנון כבר רץ")
_style_analysis_state.update(
{"running": True, "started_at": time.time(), "result": None, "error": None}
)
async def _run():
from legal_mcp.services.style_analyzer import analyze_corpus
try:
result = await analyze_corpus()
_style_analysis_state["result"] = result
except Exception as e:
logger.exception("Style analysis failed")
_style_analysis_state["error"] = str(e)
finally:
_style_analysis_state["running"] = False
asyncio.create_task(_run())
return {"status": "started"}
@app.get("/api/training/analyze-style/status")
async def training_analyze_style_status():
"""Poll status of the running style analysis."""
state = dict(_style_analysis_state)
if state["started_at"]:
state["elapsed"] = int(time.time() - state["started_at"])
return state
# ── Style Report — visual dashboard data ─────────────────────────
_SECTION_TYPE_HEBREW = {
"intro": "פתיחה",
"facts": "רקע",
"appellant_claims": "טענות העורר",
"respondent_claims": "טענות המשיב",
"legal_analysis": "דיון משפטי",
"ruling": "הכרעה",
"conclusion": "סוף דבר",
}
_SECTION_DISPLAY_ORDER = [
"intro", "facts", "appellant_claims", "respondent_claims",
"legal_analysis", "ruling", "conclusion",
]
def _strip_nikud(text: str) -> str:
import unicodedata
return "".join(
c for c in unicodedata.normalize("NFD", text)
if not unicodedata.combining(c)
)
def _extract_pattern_variants(pattern_text: str) -> list[str]:
"""Mirror of scripts/backfill_pattern_frequency.py logic for matching."""
alternatives = re.split(r"\s*/\s*|\s+או\s+", pattern_text)
variants: list[str] = []
for alt in alternatives:
alt = alt.strip()
if not alt:
continue
alt = re.sub(r"\[[^\]]*\]", "|", alt)
alt = re.sub(r"\.{2,}", "|", alt)
alt = alt.replace("", "|")
segments = [s.strip(" ,.:;\"'") for s in alt.split("|")]
good = [s for s in segments if len(s) >= 4]
if good:
variants.append(max(good, key=len))
return list(dict.fromkeys(variants))
async def _compute_corpus_stats(conn) -> dict:
"""Hero section: decision count, chars, subject distribution, timeline."""
stats = await conn.fetchrow(
"SELECT count(*) as n, "
" sum(length(full_text)) as total_chars, "
" avg(length(full_text))::int as avg_chars, "
" min(decision_date) as min_date, "
" max(decision_date) as max_date "
"FROM style_corpus"
)
decisions = await conn.fetch(
"SELECT decision_number, decision_date, length(full_text) as chars, "
" subject_categories "
"FROM style_corpus ORDER BY decision_date NULLS LAST"
)
# Subject distribution
from collections import Counter
subject_counter: Counter = Counter()
for d in decisions:
cats = d["subject_categories"]
if isinstance(cats, str):
try:
cats = json.loads(cats)
except Exception:
cats = []
for c in (cats or []):
subject_counter[c] += 1
# Cap at top 6 subjects, collapse rest to "אחר"
top = subject_counter.most_common(6)
other_count = sum(subject_counter.values()) - sum(c for _, c in top)
subject_distribution = [{"label": label, "count": count} for label, count in top]
if other_count > 0:
subject_distribution.append({"label": "אחר", "count": other_count})
n = stats["n"]
top_subject = top[0] if top else None
headline = (
f"קראתי {n} מההחלטות שלך. ממוצע {stats['avg_chars']:,} תווים לכל החלטה"
+ (f", הנושא הנפוץ אצלך: {top_subject[0]} ({top_subject[1]} החלטות)" if top_subject else "")
)
return {
"decision_count": n,
"total_chars": stats["total_chars"],
"avg_chars": stats["avg_chars"],
"date_range": [
str(stats["min_date"]) if stats["min_date"] else None,
str(stats["max_date"]) if stats["max_date"] else None,
],
"decisions": [
{
"number": d["decision_number"] or "",
"date": str(d["decision_date"]) if d["decision_date"] else "",
"chars": d["chars"],
"subjects": (
json.loads(d["subject_categories"])
if isinstance(d["subject_categories"], str)
else (d["subject_categories"] or [])
),
}
for d in decisions
],
"subject_distribution": subject_distribution,
"headline": headline,
}
async def _compute_anatomy(conn) -> dict:
"""Section 2: average section lengths across the training corpus."""
rows = await conn.fetch(
"""
SELECT dc.section_type,
sum(length(dc.content))::int as total_chars,
count(distinct dc.document_id) as docs
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
WHERE d.title LIKE '[קורפוס]%'
AND dc.section_type IS NOT NULL
GROUP BY dc.section_type
"""
)
if not rows:
return {
"sections": [],
"total_coverage": 0,
"headline": "אין עדיין נתונים על מבנה ההחלטות",
}
# Map to average per decision (total_chars / docs that have this section)
sections_raw = {r["section_type"]: r for r in rows}
# Compute avg chars per section across decisions that contain it
items = []
total_all_chars = sum(r["total_chars"] for r in rows)
for st_key in _SECTION_DISPLAY_ORDER:
if st_key not in sections_raw:
continue
r = sections_raw[st_key]
avg = round(r["total_chars"] / r["docs"]) if r["docs"] else 0
pct = r["total_chars"] / total_all_chars if total_all_chars else 0
items.append({
"type": st_key,
"label": _SECTION_TYPE_HEBREW.get(st_key, st_key),
"avg_chars": avg,
"pct": round(pct, 4),
"coverage": r["docs"],
})
# Max coverage (decisions that had any chunks)
total_coverage = await conn.fetchval(
"SELECT count(distinct dc.document_id) "
"FROM document_chunks dc JOIN documents d ON dc.document_id=d.id "
"WHERE d.title LIKE '[קורפוס]%'"
)
# Headline: biggest section
biggest = max(items, key=lambda x: x["pct"]) if items else None
if biggest:
pct_int = round(biggest["pct"] * 100)
headline = f"{biggest['label']} הוא {pct_int}% מכל החלטה אצלך — זה המוקד שלך"
else:
headline = ""
return {
"sections": items,
"total_coverage": total_coverage,
"headline": headline,
}
async def _compute_signature_phrases(conn) -> dict:
"""Section 3: all patterns with real frequencies, plus headline about top."""
rows = await conn.fetch(
"SELECT pattern_type, pattern_text, context, frequency, examples "
"FROM style_patterns "
"WHERE frequency > 0 "
"ORDER BY frequency DESC"
)
items = []
for r in rows:
examples = r["examples"]
if isinstance(examples, str):
try:
examples = json.loads(examples)
except Exception:
examples = []
items.append({
"type": r["pattern_type"],
"text": r["pattern_text"],
"context": r["context"] or "",
"frequency": r["frequency"],
"examples": examples or [],
})
# Total decision count for denominator
total_decisions = await conn.fetchval("SELECT count(*) FROM style_corpus")
if items:
# Pick the first item that's a relatively clean phrase, not a template
# (templates with many placeholders make bad display text)
top = None
for item in items[:5]:
text = item["text"]
placeholder_count = len(re.findall(r"\[[^\]]*\]", text))
if placeholder_count <= 1:
top = item
break
if top is None:
top = items[0]
# Clean up for display
display = re.sub(r"\[[^\]]*\]", "", top["text"])
display = re.sub(r"\s+", " ", display).strip(" .,:;\"'")
display = display.split(" / ")[0].split(" או ")[0].strip(" .,:;\"'")
if len(display) > 60:
display = display[:57] + "..."
headline = f'הפטרן האהוב עלייך: "{display}" — מופיע ב-{top["frequency"]} מתוך {total_decisions} החלטות'
else:
headline = "טרם חולצו דפוסים — הרץ ניתוח קורפוס"
return {"items": items, "total_decisions": total_decisions, "headline": headline}
async def _compute_contribution(conn) -> dict:
"""Section 4: per-decision contribution + growth curve."""
decisions = await conn.fetch(
"SELECT id, decision_number, decision_date, full_text, "
" length(full_text) as chars, subject_categories "
"FROM style_corpus ORDER BY decision_date NULLS LAST, created_at"
)
patterns = await conn.fetch(
"SELECT id, pattern_type, pattern_text, context "
"FROM style_patterns WHERE frequency > 0"
)
if not decisions or not patterns:
return {
"growth_curve": [],
"decision_contributions": [],
"headline": "אין עדיין מספיק נתונים",
}
# Normalize texts once
normalized_decisions = [
(d["id"], d["decision_number"], _strip_nikud(d["full_text"]))
for d in decisions
]
# For each pattern, find first decision (chronologically) that contains it
# and the full set of decisions that contain it
pattern_info: dict = {} # pattern_id → {"first": decision_id, "all": set}
for p in patterns:
variants = _extract_pattern_variants(_strip_nikud(p["pattern_text"]))
if not variants:
continue
first_seen = None
all_matches = set()
for dec_id, _, text in normalized_decisions:
if any(v in text for v in variants):
if first_seen is None:
first_seen = dec_id
all_matches.add(dec_id)
if first_seen is not None:
pattern_info[p["id"]] = {
"first": first_seen,
"all": all_matches,
"type": p["pattern_type"],
"text": p["pattern_text"],
"context": p["context"] or "",
}
# Per-decision: which patterns are new vs confirmed
decision_contributions = []
cumulative_patterns: set = set()
growth_curve = []
for d in decisions:
dec_id = d["id"]
new_patterns = []
confirmed_patterns = []
for pid, info in pattern_info.items():
if info["first"] == dec_id:
new_patterns.append(info)
elif dec_id in info["all"]:
confirmed_patterns.append(info)
# First 3 new patterns as "highlight"
highlight = new_patterns[0] if new_patterns else None
decision_contributions.append({
"decision_number": d["decision_number"] or "",
"decision_date": str(d["decision_date"]) if d["decision_date"] else "",
"chars": d["chars"],
"subjects": (
json.loads(d["subject_categories"])
if isinstance(d["subject_categories"], str)
else (d["subject_categories"] or [])
),
"new_count": len(new_patterns),
"confirmed_count": len(confirmed_patterns),
"new_patterns": [
{"type": p["type"], "text": p["text"], "context": p["context"]}
for p in new_patterns[:10] # cap to keep payload small
],
"highlight": (
{"type": highlight["type"], "text": highlight["text"]}
if highlight else None
),
})
cumulative_patterns.update(pid for pid, info in pattern_info.items() if info["first"] == dec_id)
growth_curve.append({
"decision_number": d["decision_number"] or "",
"date": str(d["decision_date"]) if d["decision_date"] else "",
"cumulative": len(cumulative_patterns),
})
# Headline: when did we hit ~85%?
total_patterns = len(pattern_info)
threshold = int(total_patterns * 0.85)
n_decisions_to_85pct = None
for i, point in enumerate(growth_curve, 1):
if point["cumulative"] >= threshold:
n_decisions_to_85pct = i
break
if n_decisions_to_85pct:
headline = (
f"אחרי {n_decisions_to_85pct} החלטות כבר למדתי 85% "
f"מהסגנון שלך — השאר מיקד וחידד את הידע"
)
else:
headline = f"למדתי {total_patterns} דפוסים מ-{len(decisions)} החלטות"
return {
"growth_curve": growth_curve,
"decision_contributions": decision_contributions,
"total_patterns": total_patterns,
"headline": headline,
}
@app.get("/api/training/style-report")
async def training_style_report():
"""Visual dashboard data for Dafna's Style Portrait page."""
pool = await db.get_pool()
async with pool.acquire() as conn:
corpus = await _compute_corpus_stats(conn)
anatomy = await _compute_anatomy(conn)
phrases = await _compute_signature_phrases(conn)
contribution = await _compute_contribution(conn)
return {
"corpus": corpus,
"anatomy": anatomy,
"signature_phrases": phrases,
"contribution": contribution,
}
@app.get("/api/training/compare")
async def training_compare(a: str, b: str):
"""Compare two decisions from style_corpus by ID.
Returns side-by-side data: basic metadata, length, section breakdown,
which patterns appear in each, shared/unique patterns.
"""
try:
ida, idb = UUID(a), UUID(b)
except ValueError:
raise HTTPException(400, "invalid id(s)")
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, decision_number, decision_date, subject_categories, "
" full_text, length(full_text) as chars "
"FROM style_corpus WHERE id = ANY($1::uuid[])",
[ida, idb],
)
if len(rows) != 2:
raise HTTPException(404, "אחת ההחלטות לא נמצאה")
by_id = {r["id"]: r for r in rows}
row_a = by_id[ida]
row_b = by_id[idb]
patterns = await conn.fetch(
"SELECT id, pattern_type, pattern_text, context "
"FROM style_patterns WHERE frequency > 0"
)
# Section breakdown via document_chunks.
# decision_number format is "NNNN/YY" but document titles are like
# "[קורפוס] ARAR-YY-NNNN - ..." so we match on the number segment only.
async def section_stats(corpus_row):
nm = corpus_row["decision_number"]
if not nm:
return []
# Extract the first numeric segment (e.g., "1188" from "1188/23")
num_match = re.match(r"(\d{3,4})", nm)
num = num_match.group(1) if num_match else nm
rows2 = await conn.fetch(
"SELECT dc.section_type, sum(length(dc.content))::int as chars "
"FROM document_chunks dc JOIN documents d ON dc.document_id=d.id "
"WHERE d.title LIKE '[קורפוס]%' "
" AND (d.title LIKE $1 OR d.title LIKE $2) "
" AND dc.section_type IS NOT NULL "
"GROUP BY dc.section_type ORDER BY chars DESC",
f"%{num}%",
f"%{nm}%",
)
return [{"type": r["section_type"], "chars": r["chars"]} for r in rows2]
sections_a = await section_stats(row_a)
sections_b = await section_stats(row_b)
# Pattern matching via variant extraction
def _strip_nikud_local(t: str) -> str:
import unicodedata
return "".join(c for c in unicodedata.normalize("NFD", t) if not unicodedata.combining(c))
def _variants(pt: str) -> list[str]:
alts = re.split(r"\s*/\s*|\s+או\s+", pt)
out = []
for a in alts:
a = re.sub(r"\[[^\]]*\]", "|", a)
a = re.sub(r"\.{2,}", "|", a).replace("", "|")
segs = [s.strip(" ,.:;\"'") for s in a.split("|")]
good = [s for s in segs if len(s) >= 4]
if good:
out.append(max(good, key=len))
return list(dict.fromkeys(out))
text_a = _strip_nikud_local(row_a["full_text"])
text_b = _strip_nikud_local(row_b["full_text"])
in_a, in_b = [], []
for p in patterns:
vs = _variants(_strip_nikud_local(p["pattern_text"]))
if not vs:
continue
in_a_flag = any(v in text_a for v in vs)
in_b_flag = any(v in text_b for v in vs)
entry = {
"id": str(p["id"]),
"type": p["pattern_type"],
"text": p["pattern_text"],
"context": p["context"] or "",
}
if in_a_flag:
in_a.append(entry)
if in_b_flag:
in_b.append(entry)
set_a = {p["id"] for p in in_a}
set_b = {p["id"] for p in in_b}
shared_ids = set_a & set_b
only_a_ids = set_a - set_b
only_b_ids = set_b - set_a
def serialize(row, sections, patterns_list):
cats = row["subject_categories"]
if isinstance(cats, str):
try:
cats = json.loads(cats)
except Exception:
cats = []
return {
"id": str(row["id"]),
"decision_number": row["decision_number"] or "",
"decision_date": str(row["decision_date"]) if row["decision_date"] else "",
"chars": row["chars"],
"subjects": cats or [],
"sections": sections,
"patterns_count": len(patterns_list),
}
return {
"a": serialize(row_a, sections_a, in_a),
"b": serialize(row_b, sections_b, in_b),
"shared": [p for p in in_a if p["id"] in shared_ids],
"only_a": [p for p in in_a if p["id"] in only_a_ids],
"only_b": [p for p in in_b if p["id"] in only_b_ids],
}
@app.delete("/api/training/corpus/{corpus_id}")
async def training_corpus_delete(corpus_id: str):
"""Remove a decision from the style corpus."""
try:
cid = UUID(corpus_id)
except ValueError:
raise HTTPException(400, "invalid corpus_id")
result = await db.delete_from_style_corpus(cid)
if not result.get("deleted"):
raise HTTPException(404, result.get("reason", "not found"))
return result
@app.get("/api/training/corpus")
async def training_corpus_list():
"""List all decisions currently in the style corpus."""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, decision_number, decision_date, subject_categories, "
" length(full_text) as chars, created_at "
"FROM style_corpus "
"ORDER BY created_at DESC"
)
return [
{
"id": str(r["id"]),
"decision_number": r["decision_number"] or "",
"decision_date": str(r["decision_date"]) if r["decision_date"] else "",
"subject_categories": (
json.loads(r["subject_categories"])
if isinstance(r["subject_categories"], str)
else r["subject_categories"] or []
),
"chars": r["chars"],
"created_at": r["created_at"].isoformat() if r["created_at"] else "",
}
for r in rows
]
def _get_active_tasks() -> list[dict]:
"""Extract active (non-terminal) tasks from _progress dict."""
items = []
for task_id, data in list(_progress.items()):
status = data.get("status", "unknown")
if status in ("completed", "failed"):
continue
items.append({
"task_id": task_id,
"status": status,
"step": data.get("step", ""),
"filename": data.get("filename", ""),
"error": data.get("error", ""),
})
return items
@app.get("/api/system/tasks")
async def system_tasks():
"""List all active background tasks (one-shot)."""
items = _get_active_tasks()
return {"active": items, "count": len(items)}
@app.get("/api/system/tasks/stream")
async def system_tasks_stream():
"""SSE stream — pushes active-task snapshots when anything changes.
Replaces client-side polling. Clients connect once and receive
events whenever the task set changes. Also sends a heartbeat every
15s to keep proxies from timing out.
"""
async def event_gen():
last_snapshot: str | None = None
last_heartbeat = time.time()
# Emit initial state immediately
while True:
snapshot = json.dumps(
{"active": _get_active_tasks(), "count": len(_get_active_tasks())},
ensure_ascii=False,
)
now = time.time()
if snapshot != last_snapshot:
yield f"event: tasks\ndata: {snapshot}\n\n"
last_snapshot = snapshot
last_heartbeat = now
elif now - last_heartbeat > 15:
yield ": heartbeat\n\n"
last_heartbeat = now
await asyncio.sleep(1)
return StreamingResponse(event_gen(), media_type="text/event-stream")
@app.get("/api/progress/{task_id}")
async def progress_stream(task_id: str):
"""SSE stream of processing progress."""
if task_id not in _progress:
raise HTTPException(404, "Task not found")
async def event_stream():
while True:
data = _progress.get(task_id, {})
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
if data.get("status") in ("completed", "failed"):
break
await asyncio.sleep(1)
# Clean up after a delay
await asyncio.sleep(30)
_progress.pop(task_id, None)
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/health")
@app.get("/api/health")
async def health():
return {"status": "ok"}
@app.get("/api/cases")
async def list_cases(detail: bool = False):
"""List existing cases. With detail=true, includes doc counts and integration URLs."""
cases = await db.list_cases()
if not detail:
return [
{"case_number": c["case_number"], "title": c["title"], "status": c["status"]}
for c in cases
]
# Enhanced listing with document counts
pool = await db.get_pool()
result = []
async with pool.acquire() as conn:
for c in cases:
case_id = UUID(c["id"])
doc_count = await conn.fetchval(
"SELECT count(*) FROM documents WHERE case_id = $1", case_id
)
processing_count = await conn.fetchval(
"SELECT count(*) FROM documents WHERE case_id = $1 AND extraction_status NOT IN ('completed', 'proofread')",
case_id,
)
result.append({
"case_number": c["case_number"],
"title": c["title"],
"status": c["status"],
"expected_outcome": c.get("expected_outcome", ""),
"committee_type": c.get("committee_type", ""),
"hearing_date": str(c["hearing_date"]) if c.get("hearing_date") else "",
"document_count": doc_count,
"processing_count": processing_count,
"gitea_url": f"https://gitea.nautilus.marcusgroup.org/cases/{c['case_number']}",
})
return result
# ── Paperclip Integration API ─────────────────────────────────────
class CaseCreateRequest(BaseModel):
case_number: str
title: str
appellants: list[str] | None = None
respondents: list[str] | None = None
subject: str = ""
property_address: str = ""
permit_number: str = ""
committee_type: str = "ועדה מקומית"
hearing_date: str = ""
notes: str = ""
expected_outcome: str = ""
practice_area: str = "appeals_committee"
appeal_subtype: str = ""
class CaseUpdateRequest(BaseModel):
status: str = ""
title: str = ""
subject: str = ""
notes: str = ""
hearing_date: str = ""
decision_date: str = ""
tags: list[str] | None = None
expected_outcome: str = ""
@app.post("/api/cases/create")
async def api_case_create(req: CaseCreateRequest):
"""Create a new appeal case."""
result = await cases_tools.case_create(
case_number=req.case_number,
title=req.title,
appellants=req.appellants,
respondents=req.respondents,
subject=req.subject,
property_address=req.property_address,
permit_number=req.permit_number,
committee_type=req.committee_type,
hearing_date=req.hearing_date,
notes=req.notes,
expected_outcome=req.expected_outcome,
practice_area=req.practice_area,
appeal_subtype=req.appeal_subtype,
)
parsed = json.loads(result)
# Auto-create Paperclip project for the new case
appeal_type = req.appeal_subtype or "רישוי"
try:
pc_result = await pc_create_project(
case_number=req.case_number,
title=req.title,
appeal_type=appeal_type,
)
parsed["paperclip"] = pc_result
logger.info("Auto-created Paperclip project for case %s: %s", req.case_number, pc_result.get("url"))
except Exception as e:
logger.warning("Failed to auto-create Paperclip project for case %s: %s", req.case_number, e)
parsed["paperclip_error"] = str(e)
return parsed
@app.get("/api/cases/{case_number}/details")
async def api_case_get(case_number: str):
"""Get full case details including documents."""
result = await cases_tools.case_get(case_number)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.put("/api/cases/{case_number}")
async def api_case_update(case_number: str, req: CaseUpdateRequest):
"""Update case details."""
result = await cases_tools.case_update(
case_number=case_number,
status=req.status,
title=req.title,
subject=req.subject,
notes=req.notes,
hearing_date=req.hearing_date,
decision_date=req.decision_date,
tags=req.tags,
expected_outcome=req.expected_outcome,
)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.delete("/api/cases")
async def api_case_delete(case_number: str, remove_files: bool = False):
"""Delete a case, identified by case_number in the query string.
Uses a query param (not a path segment) because case numbers may contain
characters like `/` that FastAPI path routing cannot capture even when
URL-encoded (%2F). Dependent documents/chunks/qa_results cascade via
FK ON DELETE CASCADE; audit_log rows nullify their case_id.
Pass `remove_files=true` to also rm -rf the on-disk case directory."""
result = await cases_tools.case_delete(case_number, remove_files)
data = json.loads(result)
if not data.get("deleted"):
raise HTTPException(404, data.get("reason", f"תיק {case_number} לא נמצא"))
return data
@app.get("/api/cases/{case_number}/status")
async def api_case_status(case_number: str):
"""Get full workflow status for a case."""
result = await workflow_tools.workflow_status(case_number)
try:
return json.loads(result)
except json.JSONDecodeError:
raise HTTPException(404, result)
@app.get("/api/search")
async def api_search(query: str, limit: int = 10, section_type: str = ""):
"""Semantic search across decisions and documents."""
result = await search_tools.search_decisions(query, limit, section_type)
try:
return json.loads(result)
except json.JSONDecodeError:
return {"message": result}
@app.get("/api/cases/{case_number}/search")
async def api_case_search(case_number: str, query: str, limit: int = 10):
"""Semantic search within a specific case's documents."""
result = await search_tools.search_case_documents(case_number, query, limit)
try:
return json.loads(result)
except json.JSONDecodeError:
return {"message": result}
@app.get("/api/cases/{case_number}/template")
async def api_case_template(case_number: str):
"""Get outcome-aware decision template for a case."""
result = await drafting_tools.get_decision_template(case_number)
if result.startswith("תיק"):
raise HTTPException(404, result)
return {"template": result}
@app.get("/api/processing-status")
async def api_processing_status():
"""Get overall processing status."""
result = await workflow_tools.processing_status()
return json.loads(result)
@app.get("/api/system/diagnostics")
async def system_diagnostics():
"""System health snapshot: DB counts, recent failures, task queue."""
pool = await db.get_pool()
async with pool.acquire() as conn:
db_ok = False
try:
await conn.fetchval("SELECT 1")
db_ok = True
except Exception:
pass
tables = {}
for t in ("cases", "documents", "document_chunks", "style_corpus", "style_patterns"):
try:
tables[t] = await conn.fetchval(f"SELECT count(*) FROM {t}")
except Exception:
tables[t] = None
# Documents that failed extraction or are stuck
failed_docs = await conn.fetch(
"SELECT d.id, d.title, d.extraction_status, d.created_at, "
" c.case_number "
"FROM documents d LEFT JOIN cases c ON d.case_id = c.id "
"WHERE d.extraction_status IN ('failed', 'error') "
"ORDER BY d.created_at DESC LIMIT 20"
)
stuck_docs = await conn.fetch(
"SELECT d.id, d.title, d.extraction_status, d.created_at, "
" c.case_number "
"FROM documents d LEFT JOIN cases c ON d.case_id = c.id "
"WHERE d.extraction_status IN ('pending', 'processing') "
" AND d.created_at < now() - interval '10 minutes' "
"ORDER BY d.created_at DESC LIMIT 20"
)
active_tasks = [
{"task_id": tid, "filename": d.get("filename", ""),
"status": d.get("status", ""), "step": d.get("step", "")}
for tid, d in _progress.items()
if d.get("status") not in ("completed", "failed")
]
return {
"db_ok": db_ok,
"tables": tables,
"failed_documents": [
{
"id": str(r["id"]),
"title": r["title"] or "",
"status": r["extraction_status"],
"case_number": r["case_number"] or "",
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
for r in failed_docs
],
"stuck_documents": [
{
"id": str(r["id"]),
"title": r["title"] or "",
"status": r["extraction_status"],
"case_number": r["case_number"] or "",
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
for r in stuck_docs
],
"active_tasks": active_tasks,
}
@app.get("/api/system/recent-activity")
async def system_recent_activity(limit: int = 8):
"""Derive a feed of recent events from cases + style_corpus + style_patterns.
Each event has: type, label, timestamp, target.
"""
pool = await db.get_pool()
events: list[dict] = []
async with pool.acquire() as conn:
# Recent cases
cases = await conn.fetch(
"SELECT case_number, title, created_at FROM cases "
"ORDER BY created_at DESC LIMIT $1", limit
)
for c in cases:
events.append({
"type": "case_created",
"label": f"תיק חדש: ערר {c['case_number']}",
"detail": c["title"] or "",
"timestamp": c["created_at"].isoformat() if c["created_at"] else None,
"target": f"/#/case/{c['case_number']}",
})
# Recent corpus additions
corpus = await conn.fetch(
"SELECT decision_number, created_at FROM style_corpus "
"ORDER BY created_at DESC LIMIT $1", limit
)
for r in corpus:
events.append({
"type": "corpus_added",
"label": f"החלטה נוספה לקורפוס: {r['decision_number'] or 'ללא מספר'}",
"detail": "",
"timestamp": r["created_at"].isoformat() if r["created_at"] else None,
"target": "/#/training",
})
# Last style analysis run (if any)
last_pattern = await conn.fetchrow(
"SELECT created_at FROM style_patterns "
"ORDER BY created_at DESC LIMIT 1"
)
if last_pattern and last_pattern["created_at"]:
count = await conn.fetchval("SELECT count(*) FROM style_patterns")
events.append({
"type": "analysis_run",
"label": f"ניתוח סגנון — {count} דפוסים חולצו",
"detail": "",
"timestamp": last_pattern["created_at"].isoformat(),
"target": "/#/style-report",
})
# Sort by timestamp desc, take top N
events.sort(key=lambda e: e["timestamp"] or "", reverse=True)
return {"events": events[:limit]}
# ── Workflow API — outcome, direction, claims, QA, learning ──────
class OutcomeRequest(BaseModel):
outcome: str # rejection / full_acceptance / partial_acceptance
reasoning: str = ""
class DirectionRequest(BaseModel):
direction_doc: dict # JSON document with main_reasoning, reasoning_order, key_precedents, notes
@app.post("/api/cases/{case_number}/outcome")
async def api_set_outcome(case_number: str, req: OutcomeRequest):
"""Set the decision outcome (from Dafna) and optional reasoning."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
# Update or create decision record
pool = await db.get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchval(
"SELECT id FROM decisions WHERE case_id = $1", case_id
)
if existing:
await conn.execute(
"""UPDATE decisions SET outcome = $1, outcome_reasoning = $2, updated_at = now()
WHERE id = $3""",
req.outcome, req.reasoning, existing,
)
else:
await conn.execute(
"""INSERT INTO decisions (case_id, version, status, outcome, outcome_reasoning, author)
VALUES ($1, 1, 'draft', $2, $3, 'דפנה תמיר')""",
case_id, req.outcome, req.reasoning,
)
# Update case status
new_status = "direction_approved" if req.reasoning else "outcome_set"
await conn.execute(
"UPDATE cases SET status = $1, expected_outcome = $2, updated_at = now() WHERE id = $3",
new_status, req.outcome, case_id,
)
return {"status": new_status, "outcome": req.outcome, "has_reasoning": bool(req.reasoning)}
@app.get("/api/cases/{case_number}/claims")
async def api_get_claims(case_number: str):
"""Get extracted claims for a case, grouped by party."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT party_role, claim_text, claim_index, source_document, addressed_in_paragraph
FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index""",
UUID(case["id"]),
)
claims_by_party = {}
for r in rows:
role = r["party_role"]
if role not in claims_by_party:
claims_by_party[role] = []
claims_by_party[role].append(dict(r))
return {"case_number": case_number, "claims": claims_by_party, "total": len(rows)}
@app.post("/api/cases/{case_number}/direction")
async def api_set_direction(case_number: str, req: DirectionRequest):
"""Save the approved direction document for the discussion block."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""UPDATE decisions SET direction_doc = $1, updated_at = now()
WHERE case_id = $2""",
json.dumps(req.direction_doc, ensure_ascii=False),
UUID(case["id"]),
)
await conn.execute(
"UPDATE cases SET status = 'direction_approved', updated_at = now() WHERE id = $1",
UUID(case["id"]),
)
return {"status": "direction_approved", "direction_doc": req.direction_doc}
@app.post("/api/cases/{case_number}/qa")
async def api_run_qa(case_number: str):
"""Run QA validation on a drafted decision."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
pool = await db.get_pool()
async with pool.acquire() as conn:
decision = await conn.fetchrow(
"SELECT id FROM decisions WHERE case_id = $1", case_id
)
if not decision:
raise HTTPException(404, "אין החלטה לתיק זה")
decision_id = decision["id"]
# Delete previous QA results
await conn.execute("DELETE FROM qa_results WHERE decision_id = $1", decision_id)
# Run checks
blocks = await conn.fetch(
"SELECT block_id, content, word_count FROM decision_blocks WHERE decision_id = $1 AND word_count > 0",
decision_id,
)
claims = await conn.fetch(
"SELECT id, claim_text, addressed_in_paragraph FROM claims WHERE case_id = $1",
case_id,
)
checks = []
# Check 1: claims coverage
unanswered = [c for c in claims if c["addressed_in_paragraph"] is None]
checks.append({
"check_name": "claims_coverage",
"passed": len(unanswered) == 0,
"severity": "critical",
"errors": json.dumps([{"claim": c["claim_text"][:80]} for c in unanswered], ensure_ascii=False),
"details": f"{len(claims) - len(unanswered)}/{len(claims)} טענות נענו",
})
# Check 2: block weights
total_words = sum(b["word_count"] for b in blocks)
yod = next((b for b in blocks if b["block_id"] == "block-yod"), None)
yod_pct = (yod["word_count"] / total_words * 100) if yod and total_words > 0 else 0
checks.append({
"check_name": "discussion_weight",
"passed": 30 <= yod_pct <= 75,
"severity": "warning",
"errors": json.dumps([]),
"details": f"בלוק דיון: {yod_pct:.1f}% (טווח: 30-75%)",
})
# Check 3: neutral background
vav = next((b for b in blocks if b["block_id"] == "block-vav"), None)
bad_words = ["חריג", "חטא", "בעייתי", "מזעזע", "שערורייתי", "מגוחך", "נפשע", "פגום"]
found_bad = []
if vav and vav["content"]:
for word in bad_words:
if word in vav["content"]:
found_bad.append(word)
checks.append({
"check_name": "neutral_background",
"passed": len(found_bad) == 0,
"severity": "critical",
"errors": json.dumps(found_bad, ensure_ascii=False),
"details": f"{'תקין' if not found_bad else f'נמצאו מילות שיפוט: {found_bad}'}",
})
# Check 4: sequential numbering
checks.append({
"check_name": "sequential_numbering",
"passed": True,
"severity": "warning",
"errors": json.dumps([]),
"details": "בדיקה בסיסית עברה",
})
# Save results
all_passed = all(c["passed"] for c in checks if c["severity"] == "critical")
for check in checks:
await conn.execute(
"""INSERT INTO qa_results (decision_id, case_id, check_name, passed, severity, errors, details)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
decision_id, case_id, check["check_name"], check["passed"],
check["severity"], check["errors"], check["details"],
)
# Update status
new_status = "drafted" if all_passed else "qa_review"
await conn.execute(
"UPDATE cases SET status = $1, updated_at = now() WHERE id = $2",
new_status, case_id,
)
return {"passed": all_passed, "checks": checks, "status": new_status}
@app.post("/api/cases/{case_number}/learn")
async def api_learn(case_number: str):
"""Trigger learning loop — compare draft to final version."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
# For now, mark as final and log
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET status = 'final', updated_at = now() WHERE id = $1",
UUID(case["id"]),
)
return {"status": "final", "message": "לולאת למידה הופעלה — גרסה סופית נקלטה"}
# ── Local files API — research, drafts, proofread ──
@app.get("/api/cases/{case_number}/local-files")
async def api_local_files(case_number: str):
"""List local files from case subdirectories (research, drafts, proofread)."""
case_dir = config.find_case_dir(case_number)
result = {}
for folder in ("research", "proofread"):
folder_path = case_dir / "documents" / folder
if folder_path.exists():
files = []
for f in sorted(folder_path.iterdir()):
if f.is_file() and not f.name.startswith("."):
stat = f.stat()
files.append({
"filename": f.name,
"size": stat.st_size,
"modified_at": stat.st_mtime,
"folder": folder,
})
if files:
result[folder] = files
# Drafts are at case level, not under documents
drafts_path = case_dir / "drafts"
if drafts_path.exists():
files = []
for f in sorted(drafts_path.iterdir()):
if f.is_file() and not f.name.startswith("."):
stat = f.stat()
files.append({
"filename": f.name,
"size": stat.st_size,
"modified_at": stat.st_mtime,
"folder": "drafts",
})
if files:
result["drafts"] = files
return result
@app.get("/api/cases/{case_number}/local-files/{folder}/{filename}")
async def api_read_local_file(case_number: str, folder: str, filename: str):
"""Read contents of a local case file."""
if folder not in ("research", "proofread", "drafts"):
raise HTTPException(400, "Invalid folder")
case_dir = config.find_case_dir(case_number)
if folder == "drafts":
path = case_dir / "drafts" / filename
else:
path = case_dir / "documents" / folder / filename
if not path.exists() or not path.is_file():
raise HTTPException(404, "קובץ לא נמצא")
return FileResponse(path, media_type="text/plain; charset=utf-8", filename=filename)
# ── Research analysis (analysis-and-research.md) — parse + edit ────
def _research_file_path(case_number: str) -> Path:
"""Resolve analysis-and-research.md path for a case."""
case_dir = config.find_case_dir(case_number)
return case_dir / "documents" / "research" / "analysis-and-research.md"
@app.get("/api/cases/{case_number}/research/analysis")
async def api_research_analysis(case_number: str):
"""Return parsed structure of analysis-and-research.md for UI rendering."""
path = _research_file_path(case_number)
if not path.exists():
raise HTTPException(404, "טרם בוצע ניתוח משפטי לתיק זה")
try:
return research_md.parse(path)
except Exception as e:
logger.exception("Failed to parse %s", path)
raise HTTPException(500, f"שגיאה בעיבוד הקובץ: {e}")
@app.get("/api/cases/{case_number}/research/analysis/download")
async def api_research_analysis_download(case_number: str):
"""Download the raw analysis-and-research.md file."""
path = _research_file_path(case_number)
if not path.exists():
raise HTTPException(404, "טרם בוצע ניתוח משפטי לתיק זה")
return FileResponse(
path,
media_type="text/markdown; charset=utf-8",
filename=f"analysis-{case_number}.md",
)
@app.put("/api/cases/{case_number}/research/analysis/upload")
async def api_research_analysis_upload(
case_number: str,
file: UploadFile = File(...),
):
"""Upload an updated analysis-and-research.md file.
Validates that:
1. The file is markdown (text)
2. It can be parsed by the research_md parser
3. It contains at least one structural section (issues or threshold_claims)
4. The case number in the file matches the URL
On success, backs up the existing file and replaces it.
"""
if not file.filename or not file.filename.endswith(".md"):
raise HTTPException(400, "הקובץ חייב להיות בפורמט Markdown (.md)")
content = await file.read()
if len(content) > 5 * 1024 * 1024:
raise HTTPException(400, "הקובץ גדול מדי — מקסימום 5MB")
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(400, "הקובץ חייב להיות בקידוד UTF-8")
if len(text.strip()) < 100:
raise HTTPException(400, "הקובץ ריק מדי — נראה שחסר תוכן")
# Write to a temp file so parse() can work on it
dest = _research_file_path(case_number)
tmp = dest.with_suffix(".md.upload-tmp")
try:
dest.parent.mkdir(parents=True, exist_ok=True)
tmp.write_text(text, encoding="utf-8")
parsed = research_md.parse(tmp)
except Exception as e:
tmp.unlink(missing_ok=True)
raise HTTPException(
400,
f"שגיאה בפרסור הקובץ — המבנה לא תקין: {e}",
)
# Validate structure
issues = parsed.get("issues", [])
thresholds = parsed.get("threshold_claims", [])
if not issues and not thresholds:
tmp.unlink(missing_ok=True)
raise HTTPException(
400,
"הקובץ חייב להכיל לפחות סעיף אחד של טענות סף או סוגיות להכרעה",
)
# Validate case number matches
file_case = parsed.get("header", {}).get("case_number", "")
if file_case and file_case != case_number:
tmp.unlink(missing_ok=True)
raise HTTPException(
400,
f"מספר התיק בקובץ ({file_case}) לא תואם לתיק הנוכחי ({case_number})",
)
# Backup existing file
if dest.exists():
backup_dir = dest.parent / "backup"
backup_dir.mkdir(exist_ok=True)
ts = time.strftime("%Y%m%d-%H%M%S")
backup_path = backup_dir / f"analysis-and-research-{ts}.md"
shutil.copy2(dest, backup_path)
# Replace with uploaded file
tmp.replace(dest)
return {
"status": "ok",
"sections": {
"threshold_claims": len(thresholds),
"issues": len(issues),
"has_conclusions": bool(parsed.get("conclusions", "").strip()),
},
"file_size": len(content),
}
class ChairPositionRequest(BaseModel):
section_id: str
position: str = ""
@app.patch("/api/cases/{case_number}/research/analysis/chair-position")
async def api_research_chair_position(case_number: str, req: ChairPositionRequest):
"""Update the chair_position field of a specific subsection, writing
directly to analysis-and-research.md (atomic rename)."""
path = _research_file_path(case_number)
if not path.exists():
raise HTTPException(404, "הקובץ לא נמצא")
if not re.match(r"^(threshold|issue)_\d+$", req.section_id):
raise HTTPException(400, "section_id לא תקין")
try:
return research_md.update_chair_position(path, req.section_id, req.position)
except ValueError as e:
raise HTTPException(404, str(e))
except Exception as e:
logger.exception("Failed to update chair position")
raise HTTPException(500, f"שגיאה בשמירה: {e}")
# ── Precedents API — attached case-law quotes for the compose phase ──
class PrecedentCreateRequest(BaseModel):
quote: str
citation: str
section_id: str = "" # empty = case-level / general discussion
chair_note: str = ""
pdf_document_id: str = "" # UUID string, empty = no PDF
@app.post("/api/cases/{case_number}/precedents")
async def api_precedent_attach(case_number: str, req: PrecedentCreateRequest):
"""Attach a legal precedent (quote + citation) to a case, optionally
scoped to a specific threshold_claim / issue section. Cross-case
library reuse happens at the search endpoint — this one always
inserts a new row."""
if req.section_id and not re.match(r"^(threshold|issue)_\d+$", req.section_id):
raise HTTPException(400, "section_id לא תקין")
if not req.quote.strip() or not req.citation.strip():
raise HTTPException(400, "quote ו-citation חובה")
result = await precedents_tools.precedent_attach(
case_number=case_number,
quote=req.quote,
citation=req.citation,
section_id=req.section_id,
chair_note=req.chair_note,
pdf_document_id=req.pdf_document_id,
)
data = json.loads(result)
if data.get("error"):
raise HTTPException(404, data["error"])
return data
@app.post("/api/cases/{case_number}/precedents/upload-pdf")
async def api_precedent_upload_pdf(
case_number: str,
file: UploadFile = File(...),
):
"""One-shot PDF upload for a precedent attachment. Stores the file
on disk alongside other case documents and creates a `documents`
row with doc_type='precedent_archive'. Returns {document_id} so the
frontend can pass it into POST /precedents. No SSE / background
processing — archive only, no text extraction."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
if not file.filename:
raise HTTPException(400, "No filename provided")
ext = Path(file.filename).suffix.lower()
if ext not in {".pdf", ".docx", ".doc"}:
raise HTTPException(400, f"סוג קובץ לא נתמך לפסיקה: {ext}")
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"קובץ גדול מדי. מקסימום: {MAX_FILE_SIZE // (1024*1024)}MB")
# Save under a dedicated precedents/ subdirectory so they don't mix
# with extracted originals.
case_dir = config.find_case_dir(case_number) / "documents" / "precedents"
case_dir.mkdir(parents=True, exist_ok=True)
safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem).strip()
dest = case_dir / f"{safe_name or 'precedent'}{ext}"
counter = 1
while dest.exists():
dest = case_dir / f"{safe_name or 'precedent'}-{counter}{ext}"
counter += 1
dest.write_bytes(content)
case_id = UUID(case["id"])
doc = await db.create_document(
case_id=case_id,
doc_type="precedent_archive",
title=safe_name or "precedent",
file_path=str(dest),
)
return {"document_id": doc["id"], "filename": dest.name}
@app.get("/api/cases/{case_number}/precedents")
async def api_precedent_list(case_number: str):
"""List all precedents attached to a case, grouped client-side by section_id."""
result = await precedents_tools.precedent_list(case_number)
data = json.loads(result)
if isinstance(data, dict) and data.get("error"):
raise HTTPException(404, data["error"])
return data
@app.delete("/api/precedents/{precedent_id}")
async def api_precedent_delete(precedent_id: str):
"""Delete a precedent attachment. The archived PDF (if any) stays
in the documents table — orphaned references nullify via FK
ON DELETE SET NULL — so we keep the audit trail of the file."""
result = await precedents_tools.precedent_remove(precedent_id)
data = json.loads(result)
if data.get("error"):
raise HTTPException(400, data["error"])
if not data.get("deleted"):
raise HTTPException(404, "לא נמצא")
return data
@app.get("/api/precedents/search")
async def api_precedent_search(q: str, practice_area: str = "", limit: int = 10):
"""Cross-case library typeahead. Returns one row per distinct citation."""
result = await precedents_tools.precedent_search_library(q, practice_area, limit)
return json.loads(result)
# ── Exports API — drafts, versions, download, upload, mark-final ──
@app.get("/api/cases/{case_number}/exports")
async def api_list_exports(case_number: str):
"""List all exported drafts and versions for a case."""
export_dir = config.find_case_dir(case_number) / "exports"
if not export_dir.exists():
return []
files = []
for f in sorted(export_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
if f.is_file() and f.suffix.lower() == ".docx":
stat = f.stat()
files.append({
"filename": f.name,
"size": stat.st_size,
"created_at": stat.st_mtime,
"is_final": f.name.startswith("סופי-"),
})
return files
@app.get("/api/cases/{case_number}/exports/{filename}/download")
async def api_download_export(case_number: str, filename: str):
"""Download an exported file."""
export_dir = config.find_case_dir(case_number) / "exports"
path = export_dir / filename
if not path.exists() or not path.parent.samefile(export_dir):
raise HTTPException(404, "קובץ לא נמצא")
return FileResponse(
path,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
filename=filename,
)
@app.post("/api/cases/{case_number}/exports/upload")
async def api_upload_export(case_number: str, file: UploadFile = File(...)):
"""Upload a revised version of a draft."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
if not file.filename:
raise HTTPException(400, "No filename provided")
ext = Path(file.filename).suffix.lower()
if ext != ".docx":
raise HTTPException(400, "רק קבצי DOCX נתמכים")
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"קובץ גדול מדי. מקסימום: {MAX_FILE_SIZE // (1024*1024)}MB")
export_dir = config.find_case_dir(case_number) / "exports"
export_dir.mkdir(parents=True, exist_ok=True)
# Version numbering for uploads
existing = sorted(export_dir.glob("עריכה-v*.docx"))
next_ver = 1
for p in existing:
try:
ver = int(p.stem.split("-v")[1])
next_ver = max(next_ver, ver + 1)
except (IndexError, ValueError):
pass
dest = export_dir / f"עריכה-v{next_ver}.docx"
dest.write_bytes(content)
return {
"filename": dest.name,
"size": len(content),
"version": next_ver,
}
@app.post("/api/cases/{case_number}/exports/{filename}/mark-final")
async def api_mark_final(case_number: str, filename: str):
"""Mark an export as the final version — copies to training corpus."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
export_dir = config.find_case_dir(case_number) / "exports"
source = export_dir / filename
if not source.exists() or not source.parent.samefile(export_dir):
raise HTTPException(404, "קובץ לא נמצא")
# Rename/copy to final
final_name = f"סופי-{case_number}.docx"
final_path = export_dir / final_name
shutil.copy2(str(source), str(final_path))
# Also copy to training directory for future style learning
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
training_dest = config.TRAINING_DIR / f"החלטה-{case_number}.docx"
shutil.copy2(str(source), str(training_dest))
# Update case status to final
pool = await db.get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET status = 'final', updated_at = now() WHERE id = $1",
UUID(case["id"]),
)
return {
"final_filename": final_name,
"training_copy": str(training_dest),
"status": "final",
}
@app.post("/api/cases/{case_number}/export-docx")
async def api_export_docx(case_number: str):
"""Trigger DOCX export for a case."""
result = await drafting_tools.export_docx(case_number)
try:
data = json.loads(result)
return data
except json.JSONDecodeError:
raise HTTPException(500, result)
@app.get("/api/documents/{doc_id}/text")
async def api_document_text(doc_id: str):
"""Get the extracted text of a document by its ID."""
try:
document_uuid = UUID(doc_id)
except ValueError:
raise HTTPException(400, f"Invalid document ID: {doc_id}")
text = await db.get_document_text(document_uuid)
if not text:
raise HTTPException(404, f"Document {doc_id} not found or has no text")
return {"doc_id": doc_id, "text": text}
# ── Integration Endpoints — Gitea & Paperclip ────────────────────
DOC_TYPE_NAMES = {
"appeal": "כתב-ערר",
"response": "תשובת",
"protocol": "פרוטוקול-דיון",
"plan": "תכנית",
"decision": "החלטה",
"court_decision": "פסק-דין",
"permit": "היתר",
"appraisal": "שומה",
"exhibit": "נספח",
"objection": "התנגדות",
"reference": "מסמך-עזר",
}
def generate_doc_filename(doc_type: str, case_number: str, party_name: str = "", ext: str = ".pdf") -> str:
"""Generate a clear Hebrew filename for a document."""
base = DOC_TYPE_NAMES.get(doc_type, doc_type)
parts = [base]
if party_name:
safe_party = re.sub(r"[^\w\u0590-\u05FF\s]", "", party_name).strip().replace(" ", "-")
parts.append(safe_party)
parts.append(case_number)
return "-".join(parts) + ext
class GiteaRepoRequest(BaseModel):
case_number: str
title: str
description: str = ""
@app.post("/api/integrations/gitea/create-repo")
async def api_gitea_create_repo(req: GiteaRepoRequest):
"""Create a Gitea repo in the 'cases' org and link it to the local case directory."""
try:
repo = await create_repo(req.case_number, req.title, req.description)
except Exception as e:
raise HTTPException(502, f"Gitea error: {e}")
clone_url = repo.get("clone_url") or repo.get("html_url", "")
case_dir = config.find_case_dir(req.case_number)
pushed = False
if case_dir.exists():
pushed = setup_remote_and_push(case_dir, clone_url)
return {
"repo_url": repo.get("html_url", ""),
"clone_url": clone_url,
"pushed": pushed,
}
class PaperclipProjectRequest(BaseModel):
case_number: str
title: str
description: str = ""
appeal_type: str = "רישוי"
@app.post("/api/integrations/paperclip/create-project")
async def api_paperclip_create_project(req: PaperclipProjectRequest):
"""Create a project in Paperclip's embedded DB."""
try:
project = await pc_create_project(
case_number=req.case_number,
title=req.title,
description=req.description,
appeal_type=req.appeal_type,
)
except Exception as e:
raise HTTPException(502, f"Paperclip error: {e}")
return project
# ── Skill Management API ───────────────────────────────────────────
PAPERCLIP_DB_URL = os.environ.get(
"PAPERCLIP_DB_URL", "postgresql://paperclip:paperclip@127.0.0.1:54329/paperclip"
)
# In Docker: mounted at /paperclip-skills; locally: ~/.paperclip/instances/default/skills
_docker_skills = Path("/paperclip-skills")
_local_skills = Path.home() / ".paperclip" / "instances" / "default" / "skills"
PAPERCLIP_SKILLS_DIR = _docker_skills if _docker_skills.exists() else _local_skills
# Default company ID for skills
SKILLS_COMPANY_ID = os.environ.get("PAPERCLIP_COMPANY_ID", "42a7acd0-30c5-4cbd-ac97-7424f65df294")
@app.get("/api/admin/skills")
async def api_list_skills():
"""List installed Paperclip skills with DB sync status."""
rows = []
try:
conn = await asyncpg.connect(PAPERCLIP_DB_URL, timeout=5)
try:
rows = await conn.fetch(
"SELECT slug, name, length(markdown) as md_chars, file_inventory, updated_at "
"FROM company_skills WHERE company_id = $1::uuid ORDER BY slug",
SKILLS_COMPANY_ID,
)
finally:
await conn.close()
except (OSError, asyncpg.PostgresError, asyncpg.InterfaceError, TimeoutError):
# Paperclip DB unreachable — continue with disk-only skills
pass
skills = []
for r in rows:
slug = r["slug"]
skill_dir = PAPERCLIP_SKILLS_DIR / SKILLS_COMPANY_ID / slug
disk_exists = skill_dir.exists()
disk_skill_md = None
if disk_exists:
skill_md = skill_dir / "SKILL.md"
if skill_md.exists():
disk_skill_md = skill_md.stat().st_size
skills.append({
"slug": slug,
"name": r["name"],
"db_markdown_chars": r["md_chars"],
"file_inventory": json.loads(r["file_inventory"]) if isinstance(r["file_inventory"], str) else r["file_inventory"],
"updated_at": r["updated_at"].isoformat() if r["updated_at"] else None,
"disk_exists": disk_exists,
"disk_skill_md_bytes": disk_skill_md,
})
# Also check for skills on disk that aren't in DB
company_dir = PAPERCLIP_SKILLS_DIR / SKILLS_COMPANY_ID
if company_dir.exists():
db_slugs = {s["slug"] for s in skills}
for d in sorted(company_dir.iterdir()):
if d.is_dir() and d.name not in db_slugs:
skill_md = d / "SKILL.md"
skills.append({
"slug": d.name,
"name": d.name,
"db_markdown_chars": 0,
"file_inventory": [],
"updated_at": None,
"disk_exists": True,
"disk_skill_md_bytes": skill_md.stat().st_size if skill_md.exists() else None,
"not_in_db": True,
})
return skills
@app.post("/api/admin/skills/install")
async def api_install_skill(file: UploadFile = File(...)):
"""Install or update a Paperclip skill from a ZIP file.
The ZIP should contain a SKILL.md at root (or in a single subdirectory).
The skill slug is derived from the directory name or ZIP filename.
"""
if not file.filename:
raise HTTPException(400, "No filename provided")
if not file.filename.lower().endswith(".zip"):
raise HTTPException(400, "Only ZIP files are supported")
content = await file.read()
if len(content) > 100 * 1024 * 1024: # 100MB limit
raise HTTPException(400, "File too large (max 100MB)")
import io
try:
zf = zipfile.ZipFile(io.BytesIO(content))
except zipfile.BadZipFile:
raise HTTPException(400, "Invalid ZIP file")
# Find SKILL.md and determine the skill root
skill_md_path = None
skill_root = ""
names = zf.namelist()
for name in names:
basename = name.split("/")[-1]
if basename == "SKILL.md":
skill_md_path = name
# Root is everything before SKILL.md
skill_root = name[: -len("SKILL.md")]
break
if not skill_md_path:
zf.close()
raise HTTPException(400, "ZIP must contain a SKILL.md file")
# Determine slug: from directory name in ZIP, or from ZIP filename
if skill_root and skill_root.strip("/"):
slug = skill_root.strip("/").split("/")[0]
else:
slug = Path(file.filename).stem.lower()
slug = re.sub(r"[^\w\-]", "-", slug).strip("-")
# Extract to skill directory
skill_dir = PAPERCLIP_SKILLS_DIR / SKILLS_COMPANY_ID / slug
skill_dir.mkdir(parents=True, exist_ok=True)
# Clear existing contents
for item in skill_dir.rglob("*"):
if item.is_file():
item.unlink()
# Remove empty subdirs
for item in sorted(skill_dir.rglob("*"), reverse=True):
if item.is_dir():
try:
item.rmdir()
except OSError:
pass
# Extract files, stripping the skill_root prefix
extracted_files = []
for name in names:
if name.endswith("/"):
continue # skip directories
if not name.startswith(skill_root):
continue # skip files outside skill root
rel_path = name[len(skill_root):]
if not rel_path:
continue
# Skip macOS metadata
if "/__MACOSX/" in name or rel_path.startswith("__MACOSX/") or rel_path.startswith("."):
continue
dest = skill_dir / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(zf.read(name))
extracted_files.append(rel_path)
zf.close()
# Read SKILL.md content
skill_md_file = skill_dir / "SKILL.md"
if not skill_md_file.exists():
raise HTTPException(500, "SKILL.md was not extracted properly")
markdown_content = skill_md_file.read_text(encoding="utf-8")
# Build file_inventory
file_inventory = []
for rel in sorted(extracted_files):
if rel == "SKILL.md":
kind = "skill"
elif rel.startswith("scripts/"):
kind = "script"
elif rel.startswith("references/"):
kind = "reference"
elif rel.endswith(".zip"):
kind = "archive"
else:
kind = "resource"
file_inventory.append({"kind": kind, "path": rel})
# Update DB
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
existing = await conn.fetchval(
"SELECT id FROM company_skills WHERE company_id = $1::uuid AND slug = $2",
SKILLS_COMPANY_ID, slug,
)
if existing:
await conn.execute(
"""UPDATE company_skills
SET markdown = $1, file_inventory = $2::jsonb, updated_at = now()
WHERE id = $3""",
markdown_content,
json.dumps(file_inventory, ensure_ascii=False),
existing,
)
action = "updated"
else:
await conn.execute(
"""INSERT INTO company_skills
(company_id, key, slug, name, markdown, source_type, file_inventory)
VALUES ($1::uuid, $2, $3, $4, $5, 'local_path', $6::jsonb)""",
SKILLS_COMPANY_ID, slug, slug, slug,
markdown_content,
json.dumps(file_inventory, ensure_ascii=False),
)
action = "installed"
finally:
await conn.close()
return {
"slug": slug,
"action": action,
"files_extracted": len(extracted_files),
"file_inventory": file_inventory,
"markdown_chars": len(markdown_content),
}
@app.post("/api/admin/skills/{slug}/sync")
async def api_sync_skill(slug: str):
"""Sync a skill from disk into the DB (for skills that exist on disk but not in DB)."""
skill_dir = PAPERCLIP_SKILLS_DIR / SKILLS_COMPANY_ID / slug
if not skill_dir.exists():
raise HTTPException(404, f"Skill directory not found on disk: {slug}")
skill_md_file = skill_dir / "SKILL.md"
if not skill_md_file.exists():
raise HTTPException(400, f"No SKILL.md found in {slug}")
markdown_content = skill_md_file.read_text(encoding="utf-8")
# Build file_inventory from disk
file_inventory = []
for f in sorted(skill_dir.rglob("*")):
if not f.is_file():
continue
rel = str(f.relative_to(skill_dir))
if rel.startswith(".") or "/__MACOSX/" in rel:
continue
if rel == "SKILL.md":
kind = "skill"
elif rel.startswith("scripts/"):
kind = "script"
elif rel.startswith("references/"):
kind = "reference"
elif rel.endswith(".zip"):
kind = "archive"
else:
kind = "resource"
file_inventory.append({"kind": kind, "path": rel})
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
existing = await conn.fetchval(
"SELECT id FROM company_skills WHERE company_id = $1::uuid AND slug = $2",
SKILLS_COMPANY_ID, slug,
)
if existing:
await conn.execute(
"""UPDATE company_skills
SET markdown = $1, file_inventory = $2::jsonb, updated_at = now()
WHERE id = $3""",
markdown_content,
json.dumps(file_inventory, ensure_ascii=False),
existing,
)
action = "updated"
else:
await conn.execute(
"""INSERT INTO company_skills
(company_id, key, slug, name, markdown, source_type, file_inventory)
VALUES ($1::uuid, $2, $3, $4, $5, 'local_path', $6::jsonb)""",
SKILLS_COMPANY_ID, slug, slug, slug,
markdown_content,
json.dumps(file_inventory, ensure_ascii=False),
)
action = "inserted"
finally:
await conn.close()
return {
"slug": slug,
"action": action,
"file_inventory": file_inventory,
"markdown_chars": len(markdown_content),
}
@app.delete("/api/admin/skills/{slug}")
async def api_delete_skill(slug: str):
"""Delete a skill from the DB. Does NOT delete files from disk."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
result = await conn.execute(
"DELETE FROM company_skills WHERE company_id = $1::uuid AND slug = $2",
SKILLS_COMPANY_ID, slug,
)
finally:
await conn.close()
if result == "DELETE 0":
raise HTTPException(404, f"Skill '{slug}' not found in DB")
return {"slug": slug, "action": "deleted"}
@app.post("/api/admin/paperclip/restart")
async def api_restart_paperclip():
"""Restart the Paperclip PM2 process.
Tries pm2 directly (works when running locally on the host).
In Docker, writes a restart flag file that the host watcher picks up.
"""
# Try pm2 directly (works when running outside Docker)
result = subprocess.run(
["pm2", "restart", "paperclip"],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
return {"status": "restarted", "method": "pm2", "output": result.stdout.strip()}
# Fallback: write a flag file that host-side watcher picks up
flag_file = PAPERCLIP_SKILLS_DIR / ".restart-requested"
try:
flag_file.write_text(str(time.time()))
return {
"status": "restart_requested",
"method": "flag_file",
"message": "Restart requested — the host watcher will restart Paperclip shortly.",
}
except Exception:
raise HTTPException(500, "Cannot restart Paperclip from Docker. Run manually: pm2 restart paperclip")
@app.post("/api/cases/{case_number}/documents/upload-tagged")
async def api_upload_tagged_document(
case_number: str,
file: UploadFile = File(...),
doc_type: str = Form("auto"),
party_name: str = Form(""),
title: str = Form(""),
):
"""Upload a document to a case with tagging and auto-rename."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
if not file.filename:
raise HTTPException(400, "No filename provided")
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"סוג קובץ לא נתמך: {ext}")
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, f"קובץ גדול מדי. מקסימום: {MAX_FILE_SIZE // (1024*1024)}MB")
# Generate smart filename — keep original name for auto classification
if doc_type == "auto":
safe_name = re.sub(r"[^\w\u0590-\u05FF\s.\-()]", "", Path(file.filename).stem).strip()
new_filename = f"{safe_name or 'document'}{ext}"
else:
new_filename = generate_doc_filename(doc_type, case_number, party_name, ext)
# Save to case directory
case_dir = config.find_case_dir(case_number) / "documents" / "originals"
case_dir.mkdir(parents=True, exist_ok=True)
dest = case_dir / new_filename
# Handle duplicates
counter = 1
while dest.exists():
stem = new_filename.rsplit(".", 1)[0]
dest = case_dir / f"{stem}-{counter}{ext}"
counter += 1
dest.write_bytes(content)
# Create document record
case_id = UUID(case["id"])
doc_title = title or new_filename.rsplit(".", 1)[0].replace("-", " ")
doc = await db.create_document(
case_id=case_id,
doc_type=doc_type if doc_type != "auto" else "reference",
title=doc_title,
file_path=str(dest),
)
# Process in background
task_id = str(uuid4())
_progress[task_id] = {"status": "queued", "filename": new_filename}
asyncio.create_task(_process_tagged_document(task_id, dest, case_number, case_id, UUID(doc["id"]), doc_type, new_filename))
return {
"task_id": task_id,
"filename": new_filename,
"original_name": file.filename,
"doc_type": doc_type,
}
async def _process_tagged_document(task_id: str, dest: Path, case_number: str, case_id: UUID, doc_id: UUID, doc_type: str, display_name: str):
"""Process an uploaded tagged document in the background."""
try:
_progress[task_id] = {"status": "processing", "filename": display_name, "step": "extracting"}
result = await processor.process_document(doc_id, case_id)
# Git commit + push
repo_dir = config.find_case_dir(case_number)
if repo_dir.exists():
env = {
"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
"PATH": "/usr/bin:/bin",
}
doc_type_hebrew = DOC_TYPE_NAMES.get(doc_type, doc_type)
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
subprocess.run(
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {display_name}"],
cwd=repo_dir, capture_output=True, env=env,
)
# Try to push to Gitea (non-blocking)
subprocess.run(["git", "push"], cwd=repo_dir, capture_output=True, env={
**env,
"GIT_TERMINAL_PROMPT": "0",
})
_progress[task_id] = {
"status": "completed",
"filename": display_name,
"result": result,
"case_number": case_number,
"doc_type": doc_type,
}
except Exception as e:
logger.exception("Processing failed for %s", display_name)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": display_name}
@app.post("/api/cases/{case_number}/documents/{doc_id}/reprocess")
async def api_reprocess_document(case_number: str, doc_id: str):
"""Reprocess a failed document."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
document_id = UUID(doc_id)
doc = await db.get_document(document_id)
if not doc or UUID(doc["case_id"]) != case_id:
raise HTTPException(404, "מסמך לא נמצא בתיק")
# Reset status and clean old chunks
await db.update_document(document_id, extraction_status="pending")
await db.delete_document_chunks(document_id)
# Process in background
asyncio.create_task(processor.process_document(document_id, case_id))
return {"status": "reprocessing"}
@app.delete("/api/cases/{case_number}/documents/{doc_id}")
async def api_delete_document(case_number: str, doc_id: str):
"""Delete a single document from a case (including its chunks and file)."""
case = await db.get_case_by_number(case_number)
if not case:
raise HTTPException(404, f"תיק {case_number} לא נמצא")
case_id = UUID(case["id"])
document_id = UUID(doc_id)
doc = await db.get_document(document_id)
if not doc or UUID(doc["case_id"]) != case_id:
raise HTTPException(404, "מסמך לא נמצא בתיק")
# Try to remove the physical file
file_path = doc.get("file_path")
if file_path:
import pathlib
p = pathlib.Path(file_path)
if p.exists():
p.unlink(missing_ok=True)
await db.delete_document(document_id)
return {"deleted": True, "doc_id": doc_id}
# ── Chair feedback endpoints ──────────────────────────────────────
@app.get("/api/feedback")
async def api_list_feedback(
case_number: str = "",
category: str = "",
unresolved_only: bool = False,
):
"""List chair feedback, optionally filtered by case/category."""
case_id = None
if case_number:
case = await db.get_case_by_number(case_number)
if case:
case_id = UUID(case["id"])
feedbacks = await db.list_chair_feedback(
case_id=case_id,
category=category or None,
unresolved_only=unresolved_only,
)
items = []
# Build case_number lookup
case_numbers: dict[str, str] = {}
pool = await db.get_pool()
for fb in feedbacks:
cid = fb.get("case_id")
cn = ""
if cid and str(cid) not in case_numbers:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT case_number, title FROM cases WHERE id = $1", cid,
)
if row:
case_numbers[str(cid)] = row["case_number"]
if cid:
cn = case_numbers.get(str(cid), "")
items.append({
"id": str(fb["id"]),
"case_id": str(fb["case_id"]) if fb["case_id"] else None,
"case_number": cn,
"block_id": fb["block_id"],
"category": fb["category"],
"feedback_text": fb["feedback_text"],
"lesson_extracted": fb["lesson_extracted"],
"resolved": fb["resolved"],
"applied_to": fb.get("applied_to", []),
"created_at": fb["created_at"].isoformat() if fb.get("created_at") else None,
})
return items
@app.post("/api/feedback")
async def api_create_feedback(
case_number: str = Form(""),
block_id: str = Form("block-yod"),
feedback_text: str = Form(...),
category: str = Form("missing_content"),
lesson_extracted: str = Form(""),
):
"""Record a new chair feedback entry."""
case_id = None
if case_number:
case = await db.get_case_by_number(case_number)
if case:
case_id = UUID(case["id"])
valid_categories = [
"missing_content", "wrong_tone", "wrong_structure",
"factual_error", "style", "other",
]
if category not in valid_categories:
raise HTTPException(400, f"קטגוריה לא חוקית. אפשרויות: {', '.join(valid_categories)}")
feedback_id = await db.record_chair_feedback(
case_id=case_id,
block_id=block_id,
feedback_text=feedback_text,
category=category,
lesson_extracted=lesson_extracted,
)
return {"id": str(feedback_id), "status": "created"}
@app.post("/api/feedback/json")
async def api_create_feedback_json(body: dict):
"""Record a new chair feedback entry (JSON body)."""
case_number = body.get("case_number", "")
case_id = None
if case_number:
case = await db.get_case_by_number(case_number)
if case:
case_id = UUID(case["id"])
valid_categories = [
"missing_content", "wrong_tone", "wrong_structure",
"factual_error", "style", "other",
]
category = body.get("category", "missing_content")
if category not in valid_categories:
raise HTTPException(400, f"קטגוריה לא חוקית. אפשרויות: {', '.join(valid_categories)}")
feedback_id = await db.record_chair_feedback(
case_id=case_id,
block_id=body.get("block_id", "block-yod"),
feedback_text=body.get("feedback_text", ""),
category=category,
lesson_extracted=body.get("lesson_extracted", ""),
)
return {"id": str(feedback_id), "status": "created"}
@app.patch("/api/feedback/{feedback_id}/resolve")
async def api_resolve_feedback(feedback_id: str, body: dict):
"""Mark feedback as resolved."""
await db.resolve_chair_feedback(
feedback_id=UUID(feedback_id),
applied_to=body.get("applied_to", []),
)
return {"status": "resolved"}
# ── Background Processing ─────────────────────────────────────────
async def _process_file(task_id: str, source: Path, req: ClassifyRequest):
"""Process a classified file in the background."""
try:
if req.category == "case":
await _process_case_document(task_id, source, req)
else:
await _process_training_document(task_id, source, req)
except Exception as e:
logger.exception("Processing failed for %s", req.filename)
_progress[task_id] = {"status": "failed", "error": str(e), "filename": req.filename}
async def _process_case_document(task_id: str, source: Path, req: ClassifyRequest):
"""Process a case document (mirrors documents.document_upload logic)."""
_progress[task_id] = {"status": "validating", "filename": req.filename}
case = await db.get_case_by_number(req.case_number)
if not case:
_progress[task_id] = {"status": "failed", "error": f"Case {req.case_number} not found"}
return
case_id = UUID(case["id"])
title = req.title or source.stem.split("_", 1)[-1] # Remove timestamp prefix
# Copy to case directory
_progress[task_id] = {"status": "copying", "filename": req.filename}
case_dir = config.find_case_dir(req.case_number) / "documents" / "originals"
case_dir.mkdir(parents=True, exist_ok=True)
# Use original name without timestamp prefix
original_name = re.sub(r"^\d+_", "", source.name)
dest = case_dir / original_name
shutil.copy2(str(source), str(dest))
# Create document record
_progress[task_id] = {"status": "registering", "filename": req.filename}
doc = await db.create_document(
case_id=case_id,
doc_type=req.doc_type,
title=title,
file_path=str(dest),
)
# Process (extract → chunk → embed → store)
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
result = await processor.process_document(UUID(doc["id"]), case_id)
# Git commit
repo_dir = config.find_case_dir(req.case_number)
if repo_dir.exists():
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
doc_type_hebrew = {
"appeal": "כתב ערר", "response": "תשובה", "decision": "החלטה",
"reference": "מסמך עזר", "exhibit": "נספח",
}.get(req.doc_type, req.doc_type)
subprocess.run(
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {title}"],
cwd=repo_dir, capture_output=True,
env={"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
"PATH": "/usr/bin:/bin"},
)
# Remove from uploads
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"result": result,
"case_number": req.case_number,
"doc_type": req.doc_type,
}
async def _process_training_document(task_id: str, source: Path, req: ClassifyRequest):
"""Process a training document (mirrors documents.document_upload_training logic)."""
from datetime import date as date_type
title = req.title or source.stem.split("_", 1)[-1]
# Copy to training directory
_progress[task_id] = {"status": "copying", "filename": req.filename}
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
original_name = re.sub(r"^\d+_", "", source.name)
dest = config.TRAINING_DIR / original_name
shutil.copy2(str(source), str(dest))
# Extract text
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "extracting"}
text, page_count = await extractor.extract_text(str(dest))
# Parse date
d_date = None
if req.decision_date:
d_date = date_type.fromisoformat(req.decision_date)
# Add to style corpus
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "corpus"}
corpus_id = await db.add_to_style_corpus(
document_id=None,
decision_number=req.decision_number,
decision_date=d_date,
subject_categories=req.subject_categories,
full_text=text,
)
# Chunk and embed
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "chunking"}
chunks = chunker.chunk_document(text)
chunk_count = 0
if chunks:
doc = await db.create_document(
case_id=None,
doc_type="decision",
title=f"[קורפוס] {title}",
file_path=str(dest),
page_count=page_count,
)
doc_id = UUID(doc["id"])
await db.update_document(doc_id, extracted_text=text, extraction_status="completed")
_progress[task_id] = {"status": "processing", "filename": req.filename, "step": "embedding"}
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await db.store_chunks(doc_id, None, chunk_dicts)
chunk_count = len(chunks)
# Remove from uploads
source.unlink(missing_ok=True)
_progress[task_id] = {
"status": "completed",
"filename": req.filename,
"result": {
"corpus_id": str(corpus_id),
"title": title,
"pages": page_count,
"text_length": len(text),
"chunks": chunk_count,
},
}