Files
legal-ai/mcp-server/src/legal_mcp/tools/documents.py
Chaim e698419faf
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 3m13s
Fix git not found error crashing document uploads in container
Install git in Docker image and wrap all subprocess git calls in
try/except so a missing or failing git binary never kills an upload
that already succeeded.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 12:38:40 +00:00

389 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for document management and processing."""
from __future__ import annotations
import json
import shutil
import subprocess
from pathlib import Path
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, processor
async def document_upload(
case_number: str,
file_path: str,
doc_type: str = "auto",
title: str = "",
) -> str:
"""העלאה ועיבוד מסמך לתיק ערר. מחלץ טקסט, יוצר chunks ו-embeddings.
Args:
case_number: מספר תיק הערר
file_path: נתיב מלא לקובץ (PDF, DOCX, RTF, TXT)
doc_type: סוג מסמך (auto=סיווג אוטומטי, appeal=כתב ערר, response=תשובה, protocol=פרוטוקול, plan=תכנית, permit=היתר, court_decision=פסק דין, decision=החלטת ועדה, appraisal=שומה, objection=התנגדות, exhibit=נספח, reference=מסמך עזר)
title: שם המסמך (אם ריק, ייקח משם הקובץ)
"""
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
source = Path(file_path)
if not source.exists():
return f"קובץ לא נמצא: {file_path}"
case_id = UUID(case["id"])
if not title:
title = source.stem
# Copy file to case directory
case_dir = config.find_case_dir(case_number) / "documents" / "originals"
case_dir.mkdir(parents=True, exist_ok=True)
dest = case_dir / source.name
shutil.copy2(str(source), str(dest))
# For auto classification, start with "reference" — will be updated after processing
initial_doc_type = doc_type if doc_type != "auto" else "reference"
# Create document record
doc = await db.create_document(
case_id=case_id,
doc_type=initial_doc_type,
title=title,
file_path=str(dest),
)
# Process document (extract → classify → chunk → embed → store)
result = await processor.process_document(UUID(doc["id"]), case_id)
# If auto-classification, update doc_type from classification result
actual_doc_type = initial_doc_type
if doc_type == "auto" and result.get("classification"):
classified_type = result["classification"].get("classification", {}).get("doc_type", "")
if classified_type:
actual_doc_type = classified_type
await db.update_document(UUID(doc["id"]), doc_type=classified_type)
doc["doc_type"] = classified_type
# Git commit (best-effort — don't fail upload on git errors)
try:
repo_dir = config.find_case_dir(case_number)
if repo_dir.exists():
subprocess.run(["git", "add", "."], cwd=repo_dir, capture_output=True)
doc_type_hebrew = {
"appeal": "כתב ערר",
"response": "תשובה",
"protocol": "פרוטוקול",
"plan": "תכנית",
"permit": "היתר",
"court_decision": "פסק דין",
"decision": "החלטה",
"appraisal": "שומה",
"objection": "התנגדות",
"exhibit": "נספח",
"reference": "מסמך עזר",
}.get(actual_doc_type, actual_doc_type)
subprocess.run(
["git", "commit", "-m", f"הוספת {doc_type_hebrew}: {title}"],
cwd=repo_dir,
capture_output=True,
env={"GIT_AUTHOR_NAME": "Ezer Mishpati", "GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati", "GIT_COMMITTER_EMAIL": "legal@local",
"PATH": "/usr/bin:/bin"},
)
except Exception:
pass # git not available in container — non-critical
return json.dumps({
"document": doc,
"processing": result,
}, default=str, ensure_ascii=False, indent=2)
async def document_upload_training(
file_path: str,
decision_number: str = "",
decision_date: str = "",
subject_categories: list[str] | None = None,
title: str = "",
practice_area: str = "appeals_committee",
appeal_subtype: str = "",
) -> str:
"""העלאת החלטה קודמת של דפנה לקורפוס הסגנון (training).
Args:
file_path: נתיב מלא לקובץ ההחלטה
decision_number: מספר ההחלטה
decision_date: תאריך ההחלטה (YYYY-MM-DD)
subject_categories: קטגוריות - אפשר לבחור כמה (בנייה, שימוש חורג, תכנית, היתר, הקלה, חלוקה, תמ"א 38, היטל השבחה, פיצויים 197)
title: שם המסמך
practice_area: תחום משפטי (appeals_committee / national_insurance / labor_law)
appeal_subtype: סוג ערר (building_permit / betterment_levy / compensation_197).
ריק = יוסק אוטומטית ממספר ההחלטה
"""
from datetime import date as date_type
from legal_mcp.services import chunker, embeddings, extractor, practice_area as pa
source = Path(file_path)
if not source.exists():
return f"קובץ לא נמצא: {file_path}"
if not title:
title = source.stem
# Resolve subtype: explicit > derived from decision_number > 'unknown'
if not appeal_subtype:
appeal_subtype = pa.derive_subtype(decision_number, practice_area)
pa.validate(practice_area, appeal_subtype)
# Copy to training directory (skip if already there)
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
dest = config.TRAINING_DIR / source.name
if source.resolve() != dest.resolve():
shutil.copy2(str(source), str(dest))
# Extract text
text, page_count = await extractor.extract_text(str(dest))
# Parse date
d_date = None
if decision_date:
d_date = date_type.fromisoformat(decision_date)
# Add to style corpus (tagged by domain so block-writer can filter)
corpus_id = await db.add_to_style_corpus(
document_id=None,
decision_number=decision_number,
decision_date=d_date,
subject_categories=subject_categories or [],
full_text=text,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
)
# Chunk and embed for RAG search over training corpus
chunks = chunker.chunk_document(text)
if chunks:
# Create a document record (no case association — tag explicitly)
doc = await db.create_document(
case_id=None,
doc_type="decision",
title=f"[קורפוס] {title}",
file_path=str(dest),
page_count=page_count,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
)
doc_id = UUID(doc["id"])
await db.update_document(doc_id, extracted_text=text, extraction_status="completed")
# Generate embeddings and store chunks
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await db.store_chunks(
doc_id, None, chunk_dicts,
practice_area=practice_area, appeal_subtype=appeal_subtype,
)
return json.dumps({
"corpus_id": str(corpus_id),
"title": title,
"pages": page_count,
"text_length": len(text),
"chunks": len(chunks) if chunks else 0,
}, default=str, ensure_ascii=False, indent=2)
async def document_get_text(case_number: str, doc_title: str = "") -> str:
"""קבלת טקסט מלא של מסמך מתוך תיק.
Args:
case_number: מספר תיק הערר
doc_title: שם המסמך (אם ריק, מחזיר את כל המסמכים)
"""
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
docs = await db.list_documents(UUID(case["id"]))
if not docs:
return f"אין מסמכים בתיק {case_number}."
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
if not docs:
return f"מסמך '{doc_title}' לא נמצא בתיק."
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
results.append({
"title": doc["title"],
"doc_type": doc["doc_type"],
"text": text[:10000] if text else "(ללא טקסט)",
})
return json.dumps(results, ensure_ascii=False, indent=2)
async def document_list(case_number: str) -> str:
"""רשימת מסמכים בתיק.
Args:
case_number: מספר תיק הערר
"""
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
docs = await db.list_documents(UUID(case["id"]))
if not docs:
return f"אין מסמכים בתיק {case_number}."
return json.dumps(docs, default=str, ensure_ascii=False, indent=2)
async def extract_references(
case_number: str,
doc_title: str = "",
) -> str:
"""זיהוי תכניות, פסיקה וחקיקה מתוך מסמכי תיק.
Args:
case_number: מספר תיק הערר
doc_title: שם מסמך ספציפי (אם ריק, מזהה בכל המסמכים)
"""
from legal_mcp.services import references_extractor
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
if not docs:
return f"אין מסמכים בתיק {case_number}."
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
continue
refs = await references_extractor.extract_and_link_references(
UUID(doc["id"]), case_id, text,
)
results.append({
"document": doc["title"],
"plans": refs["plans"],
"case_law": refs["case_law"],
"case_law_linked": refs["case_law_linked"],
"legislation": refs["legislation"],
})
return json.dumps(results, default=str, ensure_ascii=False, indent=2)
async def extract_claims(
case_number: str,
doc_title: str = "",
party_hint: str = "",
) -> str:
"""חילוץ טענות מכתב טענות בתיק ושמירה ב-DB.
Args:
case_number: מספר תיק הערר
doc_title: שם מסמך ספציפי (אם ריק, מחלץ מכל כתבי הטענות)
party_hint: שם הצד המגיש (אם ידוע)
"""
from legal_mcp.services import claims_extractor
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
if not docs:
return f"אין מסמכים בתיק {case_number}."
# Filter to claims documents (appeal, response) or specific doc
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
else:
docs = [d for d in docs if d["doc_type"] in ("appeal", "response", "objection")]
if not docs:
return "לא נמצאו כתבי טענות בתיק."
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
continue
result = await claims_extractor.extract_and_store_claims(
case_id=case_id,
document_id=UUID(doc["id"]),
text=text,
doc_type=doc["doc_type"],
party_hint=party_hint,
)
results.append(result)
return json.dumps(results, default=str, ensure_ascii=False, indent=2)
async def get_claims(case_number: str, party_role: str = "") -> str:
"""שליפת טענות שחולצו לתיק.
Args:
case_number: מספר תיק הערר
party_role: סינון לפי צד (appellant/respondent/committee/permit_applicant). ריק = הכל.
"""
case = await db.get_case_by_number(case_number)
if not case:
return f"תיק {case_number} לא נמצא."
claims = await db.get_claims(
UUID(case["id"]),
party_role=party_role if party_role else None,
)
if not claims:
return f"אין טענות בתיק {case_number}."
# Format for display
role_hebrew = {
"appellant": "עוררים",
"respondent": "משיבים",
"committee": "ועדה מקומית",
"permit_applicant": "מבקשי היתר",
"appraiser": "שמאי",
}
formatted = []
for c in claims:
formatted.append({
"party": role_hebrew.get(c["party_role"], c["party_role"]),
"claim": c["claim_text"],
"source": c.get("source_document", ""),
})
return json.dumps(formatted, default=str, ensure_ascii=False, indent=2)