Files
legal-ai/mcp-server/src/legal_mcp/tools/documents.py
Chaim 79b9c37301 feat(mcp): FU-14 GAP-48 פרוסה 2 — envelope אחיד ל-11 משפחות-כלים
המשך מיגרציית INV-TOOL1 מעבר למשפחת-החיפוש (#71). הומרו ל-{status,data,message}:
precedent_library, citations, internal_decisions, missing_precedents,
training_enrichment, precedents, legal_arguments, cases, documents, workflow
(~55 כלים). בוטלו 5 עותקי _ok/_err משוכפלים (alias ל-tools/envelope.py — SSoT, G2).

עיקרון: envelope-status = הצלחת-הקריאה-לכלי; תוצאה-עסקית (idempotent_existing,
noop, completed...) נשמרת בתוך data. err רק לכשל אמיתי (not-found/invalid/exception).

תאימות-API: צרכני web/app.py של cases/workflow/precedents חוּוטו דרך
envelope_unwrap + בדיקת status=="error"→4xx — תשובת ה-HTTP זהה, web-ui לא מושפע.
(documents/legal_arguments/citations/... אינם נצרכים מ-app.py — agent-only.)

בדיקות: 182/182 עוברים (test_corpus_constraints עודכן לחוזה החדש).
נותר: משפחת drafting (מסלול הפקת-ההחלטה) בפרוסה נפרדת עם שער טסט-ייצוא.

Invariants: מקדם INV-TOOL1 + G2 (SSoT, ביטול כפילות). מתועד ב-X9 + gap-audit.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 17:41:39 +00:00

487 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MCP tools for document management and processing."""
from __future__ import annotations
import hashlib
import json
import shutil
from pathlib import Path
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import audit, db, git_sync, processor
from legal_mcp.tools.envelope import empty, err, ok # GAP-48: SSoT envelope
async def document_upload(
case_number: str,
file_path: str,
doc_type: str = "auto",
title: str = "",
) -> str:
"""העלאה ועיבוד מסמך לתיק ערר. מחלץ טקסט, יוצר chunks ו-embeddings.
Args:
case_number: מספר תיק הערר
file_path: נתיב מלא לקובץ (PDF, DOCX, RTF, TXT)
doc_type: סוג מסמך (auto=סיווג אוטומטי, appeal=כתב ערר, response=תשובה, protocol=פרוטוקול, plan=תכנית, permit=היתר, court_decision=פסק דין, decision=החלטת ועדה, appraisal=שומה, objection=התנגדות, exhibit=נספח, reference=מסמך עזר)
title: שם המסמך (אם ריק, ייקח משם הקובץ)
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
source = Path(file_path)
if not source.exists():
return err(f"קובץ לא נמצא: {file_path}")
case_id = UUID(case["id"])
if not title:
title = source.stem
# INV-TOOL3 / GAP-52: idempotent on (case_id, file content hash). Re-uploading
# the same bytes returns the existing document and skips re-copy + re-OCR +
# re-embed (the expensive part).
content_hash = hashlib.sha256(source.read_bytes()).hexdigest()
existing_doc = await db.get_document_by_hash(case_id, content_hash)
if existing_doc:
return ok({
"document": existing_doc,
"idempotent_existing": True,
}, message=f"הקובץ כבר הועלה לתיק {case_number} (זהה ב-hash) — מוחזר הקיים, ללא עיבוד מחדש.")
# Copy file to case directory
case_dir = config.find_case_dir(case_number) / "documents" / "originals"
case_dir.mkdir(parents=True, exist_ok=True)
dest = case_dir / source.name
shutil.copy2(str(source), str(dest))
# For auto classification, start with "reference" — will be updated after processing
initial_doc_type = doc_type if doc_type != "auto" else "reference"
# Create document record
doc = await db.create_document(
case_id=case_id,
doc_type=initial_doc_type,
title=title,
file_path=str(dest),
content_hash=content_hash,
)
# Process document (extract → classify → chunk → embed → store)
result = await processor.process_document(UUID(doc["id"]), case_id)
# If auto-classification, update doc_type from classification result
actual_doc_type = initial_doc_type
if doc_type == "auto" and result.get("classification"):
classified_type = result["classification"].get("classification", {}).get("doc_type", "")
if classified_type:
actual_doc_type = classified_type
await db.update_document(UUID(doc["id"]), doc_type=classified_type)
doc["doc_type"] = classified_type
# Git commit + push (best-effort — don't fail upload on git errors)
try:
repo_dir = config.find_case_dir(case_number)
if repo_dir.exists():
doc_type_hebrew = {
"appeal": "כתב ערר",
"response": "תשובה",
"protocol": "פרוטוקול",
"plan": "תכנית",
"permit": "היתר",
"court_decision": "פסק דין",
"decision": "החלטה",
"appraisal": "שומה",
"objection": "התנגדות",
"exhibit": "נספח",
"reference": "מסמך עזר",
}.get(actual_doc_type, actual_doc_type)
git_sync.commit_and_push(repo_dir, f"הוספת {doc_type_hebrew}: {title}")
except Exception:
pass # git not available in container — non-critical
await audit.log_action_safe(
"document_upload", case_id=case_id, document_id=UUID(doc["id"]),
details={"title": title, "doc_type": actual_doc_type},
)
return ok({
"document": doc,
"processing": result,
})
async def document_upload_training(
file_path: str,
decision_number: str = "",
decision_date: str = "",
subject_categories: list[str] | None = None,
title: str = "",
practice_area: str = "appeals_committee",
appeal_subtype: str = "",
) -> str:
"""העלאת החלטה קודמת של דפנה לקורפוס הסגנון (training).
Args:
file_path: נתיב מלא לקובץ ההחלטה
decision_number: מספר ההחלטה
decision_date: תאריך ההחלטה (YYYY-MM-DD)
subject_categories: קטגוריות - אפשר לבחור כמה (בנייה, שימוש חורג, תכנית, היתר, הקלה, חלוקה, תמ"א 38, היטל השבחה, פיצויים 197)
title: שם המסמך
practice_area: תחום משפטי (appeals_committee / national_insurance / labor_law)
appeal_subtype: סוג ערר (building_permit / betterment_levy / compensation_197).
ריק = יוסק אוטומטית ממספר ההחלטה
"""
from datetime import date as date_type
from legal_mcp.services import chunker, embeddings, extractor, practice_area as pa
source = Path(file_path)
if not source.exists():
return err(f"קובץ לא נמצא: {file_path}")
if not title:
title = source.stem
# Resolve subtype: explicit > derived from decision_number > 'unknown'
if not appeal_subtype:
appeal_subtype = pa.derive_subtype(decision_number, practice_area)
pa.validate(practice_area, appeal_subtype)
# Copy to training directory, organized by subtype
_SUBTYPE_DIRS = {
"betterment_levy": "cmpa",
"compensation_197": "cmpa",
"building_permit": "cmp",
}
subdir = _SUBTYPE_DIRS.get(appeal_subtype, "")
training_dest = config.TRAINING_DIR / subdir if subdir else config.TRAINING_DIR
training_dest.mkdir(parents=True, exist_ok=True)
dest = training_dest / source.name
if source.resolve() != dest.resolve():
shutil.copy2(str(source), str(dest))
# Extract text and strip Nevo preamble
text, page_count, _ = await extractor.extract_text(str(dest))
text = extractor.strip_nevo_preamble(text)
# Parse date
d_date = None
if decision_date:
d_date = date_type.fromisoformat(decision_date)
# Add to style corpus (tagged by domain so block-writer can filter)
corpus_id = await db.add_to_style_corpus(
document_id=None,
decision_number=decision_number,
decision_date=d_date,
subject_categories=subject_categories or [],
full_text=text,
practice_area=practice_area,
appeal_subtype=appeal_subtype,
)
# Chunk and embed for RAG search over training corpus
chunks = chunker.chunk_document(text)
if chunks:
# Create a document record (no case association — tag explicitly)
doc = await db.create_document(
case_id=None,
doc_type="decision",
title=f"[קורפוס] {title}",
file_path=str(dest),
page_count=page_count,
)
doc_id = UUID(doc["id"])
await db.update_document(
doc_id, extracted_text=text, extraction_status="completed",
metadata={"practice_area": practice_area, "appeal_subtype": appeal_subtype},
)
# Generate embeddings and store chunks
texts = [c.content for c in chunks]
embs = await embeddings.embed_texts(texts, input_type="document")
chunk_dicts = [
{
"content": c.content,
"section_type": c.section_type,
"embedding": emb,
"page_number": c.page_number,
"chunk_index": c.chunk_index,
}
for c, emb in zip(chunks, embs)
]
await db.store_chunks(doc_id, None, chunk_dicts)
return ok({
"corpus_id": str(corpus_id),
"title": title,
"pages": page_count,
"text_length": len(text),
"chunks": len(chunks) if chunks else 0,
})
async def document_get_text(case_number: str, doc_title: str = "") -> str:
"""קבלת טקסט מלא של מסמך מתוך תיק.
Args:
case_number: מספר תיק הערר
doc_title: שם המסמך (אם ריק, מחזיר את כל המסמכים)
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
docs = await db.list_documents(UUID(case["id"]))
if not docs:
return empty(f"אין מסמכים בתיק {case_number}.")
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
if not docs:
return err(f"מסמך '{doc_title}' לא נמצא בתיק.")
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
results.append({
"title": doc["title"],
"doc_type": doc["doc_type"],
"text": text[:10000] if text else "(ללא טקסט)",
})
return ok(results)
async def document_list(case_number: str) -> str:
"""רשימת מסמכים בתיק.
Args:
case_number: מספר תיק הערר
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
docs = await db.list_documents(UUID(case["id"]))
if not docs:
return empty(f"אין מסמכים בתיק {case_number}.")
return ok(docs)
async def extract_references(
case_number: str,
doc_title: str = "",
) -> str:
"""זיהוי תכניות, פסיקה וחקיקה מתוך מסמכי תיק.
Args:
case_number: מספר תיק הערר
doc_title: שם מסמך ספציפי (אם ריק, מזהה בכל המסמכים)
"""
from legal_mcp.services import references_extractor
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
if not docs:
return empty(f"אין מסמכים בתיק {case_number}.")
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
continue
refs = await references_extractor.extract_and_link_references(
UUID(doc["id"]), case_id, text,
)
results.append({
"document": doc["title"],
"plans": refs["plans"],
"case_law": refs["case_law"],
"case_law_linked": refs["case_law_linked"],
"legislation": refs["legislation"],
})
return ok(results)
async def extract_claims(
case_number: str,
doc_title: str = "",
party_hint: str = "",
) -> str:
"""חילוץ טענות מכתב טענות בתיק ושמירה ב-DB.
Args:
case_number: מספר תיק הערר
doc_title: שם מסמך ספציפי (אם ריק, מחלץ מכל כתבי הטענות)
party_hint: שם הצד המגיש (אם ידוע)
"""
from legal_mcp.services import claims_extractor
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
case_id = UUID(case["id"])
docs = await db.list_documents(case_id)
if not docs:
return empty(f"אין מסמכים בתיק {case_number}.")
# Filter to claims documents (appeal, response) or specific doc
if doc_title:
docs = [d for d in docs if doc_title.lower() in d["title"].lower()]
else:
docs = [d for d in docs if d["doc_type"] in ("appeal", "response", "objection")]
if not docs:
return empty("לא נמצאו כתבי טענות בתיק.")
results = []
for doc in docs:
text = await db.get_document_text(UUID(doc["id"]))
if not text:
continue
result = await claims_extractor.extract_and_store_claims(
case_id=case_id,
document_id=UUID(doc["id"]),
text=text,
doc_type=doc["doc_type"],
party_hint=party_hint,
)
results.append(result)
await audit.log_action_safe(
"extract_claims", case_id=case_id,
details={"docs_processed": len(docs), "results": len(results)},
)
return ok(results)
async def get_claims(case_number: str, party_role: str = "") -> str:
"""שליפת טענות שחולצו לתיק.
Args:
case_number: מספר תיק הערר
party_role: סינון לפי צד (appellant/respondent/committee/permit_applicant). ריק = הכל.
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
claims = await db.get_claims(
UUID(case["id"]),
party_role=party_role if party_role else None,
)
if not claims:
return empty(f"אין טענות בתיק {case_number}.")
# Format for display
role_hebrew = {
"appellant": "עוררים",
"respondent": "משיבים",
"committee": "ועדה מקומית",
"permit_applicant": "מבקשי היתר",
"appraiser": "שמאי",
}
formatted = []
for c in claims:
formatted.append({
"party": role_hebrew.get(c["party_role"], c["party_role"]),
"claim": c["claim_text"],
"source": c.get("source_document", ""),
})
return ok(formatted)
# Whitelist of doc_type values; mirrors web/app.py:DOC_TYPE_NAMES.
ALLOWED_DOC_TYPES = {
"appeal", "response", "protocol", "plan", "decision",
"court_decision", "permit", "appraisal", "exhibit",
"objection", "reference",
}
# Allowed appraiser_side values; '' (empty) clears the tag.
ALLOWED_APPRAISER_SIDES = {"committee", "appellant", "deciding", ""}
async def document_update(
case_number: str,
doc_id: str,
doc_type: str = "",
appraiser_side: str = "",
) -> str:
"""עדכון תיוג מסמך — doc_type ו/או appraiser_side. ריק = אין שינוי.
הולידציה זהה ל-PATCH endpoint ב-web/app.py. appraiser_side נשמר ב-
documents.metadata JSONB (מתפרסם משם ע"י extract_appraiser_facts).
Args:
case_number: מספר תיק הערר (לאישור שייכות)
doc_id: UUID של המסמך
doc_type: ערך חדש (appeal/response/protocol/plan/decision/court_decision/
permit/appraisal/exhibit/objection/reference). ריק = אין שינוי.
appraiser_side: ערך חדש (committee/appellant/deciding). ריק = אין שינוי;
העבר במפורש מחרוזת ריקה לא-default אם רוצים לנקות.
"""
case = await db.get_case_by_number(case_number)
if not case:
return err(f"תיק {case_number} לא נמצא.")
try:
doc_uuid = UUID(doc_id)
except ValueError:
return err(f"doc_id לא תקין: {doc_id}")
doc = await db.get_document(doc_uuid)
if not doc:
return err(f"מסמך {doc_id} לא נמצא.")
if doc.get("case_id") != case["id"]:
return err(f"מסמך {doc_id} לא שייך לתיק {case_number}.")
updates: dict = {}
if doc_type:
if doc_type not in ALLOWED_DOC_TYPES:
return err(f"doc_type לא תקין: {doc_type}",
data={"allowed": sorted(ALLOWED_DOC_TYPES)})
updates["doc_type"] = doc_type
# appraiser_side is optional. The MCP tool can't distinguish "skip" from
# "set to empty string", so we use the convention: only update if non-empty.
# To clear, the operator must edit metadata directly (rare).
if appraiser_side:
if appraiser_side not in ALLOWED_APPRAISER_SIDES:
return err(f"appraiser_side לא תקין: {appraiser_side}",
data={"allowed": sorted(s for s in ALLOWED_APPRAISER_SIDES if s)})
metadata = doc.get("metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
metadata["appraiser_side"] = appraiser_side
updates["metadata"] = metadata
if not updates:
return ok({"noop": True}, message="אין שינוי לבצע.")
await db.update_document(doc_uuid, **updates)
fresh = await db.get_document(doc_uuid)
return ok({
"doc_id": doc_id,
"doc_type": fresh.get("doc_type"),
"metadata": fresh.get("metadata"),
})