Merge pull request 'feat(storage): X14 Phase 2c — remaining sync write-sites through storage.py' (#156) from worktree-storage-minio-phase2c into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
This commit was merged in pull request #156.
This commit is contained in:
@@ -14,6 +14,9 @@ from __future__ import annotations
|
|||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
|
from legal_mcp import config
|
||||||
|
from legal_mcp.services import storage
|
||||||
import zipfile
|
import zipfile
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -304,10 +307,17 @@ def retrofit_bookmarks(
|
|||||||
end_idx = len(paragraphs) - 1
|
end_idx = len(paragraphs) - 1
|
||||||
ranges.append((name, start_idx, max(start_idx, end_idx)))
|
ranges.append((name, start_idx, max(start_idx, end_idx)))
|
||||||
|
|
||||||
# Backup if overwriting in place
|
# Backup if overwriting in place — through the storage layer (INV-STG1).
|
||||||
if backup and output_path.resolve() == docx_path.resolve():
|
if backup and output_path.resolve() == docx_path.resolve():
|
||||||
backup_path = docx_path.with_suffix(".pre-retrofit.docx")
|
backup_path = docx_path.with_suffix(".pre-retrofit.docx")
|
||||||
shutil.copy2(str(docx_path), str(backup_path))
|
try:
|
||||||
|
_bkey = backup_path.resolve().relative_to(
|
||||||
|
Path(config.DATA_DIR).resolve()).as_posix()
|
||||||
|
storage.put_file_sync(
|
||||||
|
docx_path, _bkey, bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document")
|
||||||
|
except ValueError:
|
||||||
|
shutil.copy2(str(docx_path), str(backup_path))
|
||||||
|
|
||||||
# Inject bookmarks, skipping any that already exist
|
# Inject bookmarks, skipping any that already exist
|
||||||
next_id = _next_bookmark_id(doc_tree)
|
next_id = _next_bookmark_id(doc_tree)
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
|
from legal_mcp import config
|
||||||
|
from legal_mcp.services import storage
|
||||||
import zipfile
|
import zipfile
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -98,6 +101,22 @@ def _load_docx_xml(docx_path: Path) -> tuple[dict[str, bytes], etree._Element, e
|
|||||||
return members, document_tree, settings_tree
|
return members, document_tree, settings_tree
|
||||||
|
|
||||||
|
|
||||||
|
_DOCX_CTYPE = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
||||||
|
|
||||||
|
|
||||||
|
def _persist_docx_sync(output_path: Path, data: bytes) -> None:
|
||||||
|
"""Persist DOCX bytes through the storage layer (INV-STG1); fall back to a
|
||||||
|
direct disk write when output_path is outside DATA_DIR (caller-provided)."""
|
||||||
|
out = Path(output_path)
|
||||||
|
try:
|
||||||
|
key = out.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
|
||||||
|
storage.put_bytes_sync(key, data, bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type=_DOCX_CTYPE)
|
||||||
|
except ValueError:
|
||||||
|
out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
out.write_bytes(data)
|
||||||
|
|
||||||
|
|
||||||
def _save_docx_xml(
|
def _save_docx_xml(
|
||||||
members: dict[str, bytes],
|
members: dict[str, bytes],
|
||||||
document_tree: etree._Element,
|
document_tree: etree._Element,
|
||||||
@@ -113,12 +132,11 @@ def _save_docx_xml(
|
|||||||
settings_tree, xml_declaration=True, encoding="UTF-8", standalone=True
|
settings_tree, xml_declaration=True, encoding="UTF-8", standalone=True
|
||||||
)
|
)
|
||||||
|
|
||||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
buffer = BytesIO()
|
buffer = BytesIO()
|
||||||
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
for name, data in members.items():
|
for name, data in members.items():
|
||||||
zf.writestr(name, data)
|
zf.writestr(name, data)
|
||||||
output_path.write_bytes(buffer.getvalue())
|
_persist_docx_sync(output_path, buffer.getvalue())
|
||||||
|
|
||||||
|
|
||||||
def _ensure_track_revisions(settings_tree: etree._Element) -> None:
|
def _ensure_track_revisions(settings_tree: etree._Element) -> None:
|
||||||
@@ -511,4 +529,11 @@ def copy_with_revisions(
|
|||||||
source_path: str | Path, output_path: str | Path,
|
source_path: str | Path, output_path: str | Path,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Copy source → output unchanged (used when revisions list is empty)."""
|
"""Copy source → output unchanged (used when revisions list is empty)."""
|
||||||
shutil.copy2(str(source_path), str(output_path))
|
out = Path(output_path)
|
||||||
|
try:
|
||||||
|
key = out.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
|
||||||
|
storage.put_file_sync(source_path, key, bucket=storage.Bucket.DOCUMENTS,
|
||||||
|
content_type=_DOCX_CTYPE)
|
||||||
|
except ValueError:
|
||||||
|
out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
shutil.copy2(str(source_path), str(out))
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from docx import Document as DocxDocument
|
|||||||
from striprtf.striprtf import rtf_to_text
|
from striprtf.striprtf import rtf_to_text
|
||||||
|
|
||||||
from legal_mcp import config
|
from legal_mcp import config
|
||||||
|
from legal_mcp.services import storage
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from google.cloud import vision
|
from google.cloud import vision
|
||||||
@@ -345,7 +346,19 @@ def render_pages_for_multimodal(
|
|||||||
max(1, int(img.height * ratio)),
|
max(1, int(img.height * ratio)),
|
||||||
)
|
)
|
||||||
thumb = img.resize(thumb_size, Image.Resampling.LANCZOS)
|
thumb = img.resize(thumb_size, Image.Resampling.LANCZOS)
|
||||||
thumb.save(thumb_path, "JPEG", quality=75, optimize=True)
|
# Persist the thumbnail (a DERIVED, regenerable artifact)
|
||||||
|
# through the storage layer (INV-STG1). Under the filesystem
|
||||||
|
# backend it lands at thumb_path exactly as before.
|
||||||
|
_tbuf = io.BytesIO()
|
||||||
|
thumb.save(_tbuf, "JPEG", quality=75, optimize=True)
|
||||||
|
try:
|
||||||
|
_tkey = thumb_path.resolve().relative_to(
|
||||||
|
Path(config.DATA_DIR).resolve()).as_posix()
|
||||||
|
storage.put_bytes_sync(
|
||||||
|
_tkey, _tbuf.getvalue(), bucket=storage.Bucket.DERIVED,
|
||||||
|
content_type="image/jpeg")
|
||||||
|
except ValueError:
|
||||||
|
thumb.save(thumb_path, "JPEG", quality=75, optimize=True)
|
||||||
|
|
||||||
out.append((img, thumb_path))
|
out.append((img, thumb_path))
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ is absent (the default filesystem backend needs nothing extra).
|
|||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
@@ -470,3 +471,43 @@ def local_path(key, *, bucket=Bucket.DOCUMENTS) -> Path | None:
|
|||||||
|
|
||||||
async def ensure_local(key, *, bucket=Bucket.DOCUMENTS) -> Path:
|
async def ensure_local(key, *, bucket=Bucket.DOCUMENTS) -> Path:
|
||||||
return await get_storage().ensure_local(key, bucket=bucket)
|
return await get_storage().ensure_local(key, bucket=bucket)
|
||||||
|
|
||||||
|
|
||||||
|
# ── synchronous facade ─────────────────────────────────────────────
|
||||||
|
# A few legacy writers are plain sync functions (track-changes save, retrofit
|
||||||
|
# backup, the multimodal thumbnail renderer which runs in a worker thread via
|
||||||
|
# asyncio.to_thread). They go through the same layer via this blocking shim so
|
||||||
|
# INV-STG1 holds everywhere.
|
||||||
|
|
||||||
|
def _run_coro_blocking(coro):
|
||||||
|
"""Run a storage coroutine to completion from synchronous code.
|
||||||
|
|
||||||
|
No running loop in this thread (the common case — sync helpers, or a
|
||||||
|
to_thread worker) → asyncio.run. If a loop *is* already running here, the
|
||||||
|
coroutine is offloaded to a fresh thread so we never deadlock the loop."""
|
||||||
|
try:
|
||||||
|
asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
return asyncio.run(coro)
|
||||||
|
box: dict = {}
|
||||||
|
|
||||||
|
def _worker():
|
||||||
|
box["value"] = asyncio.run(coro)
|
||||||
|
|
||||||
|
import threading
|
||||||
|
t = threading.Thread(target=_worker)
|
||||||
|
t.start()
|
||||||
|
t.join()
|
||||||
|
return box["value"]
|
||||||
|
|
||||||
|
|
||||||
|
def put_bytes_sync(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None,
|
||||||
|
metadata=None) -> str:
|
||||||
|
return _run_coro_blocking(
|
||||||
|
put_bytes(key, data, bucket=bucket, content_type=content_type, metadata=metadata))
|
||||||
|
|
||||||
|
|
||||||
|
def put_file_sync(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None,
|
||||||
|
metadata=None) -> str:
|
||||||
|
return _run_coro_blocking(
|
||||||
|
put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata))
|
||||||
|
|||||||
@@ -46,3 +46,32 @@ def test_stage_file_default_subdir(_tmp_datadir):
|
|||||||
dest = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
|
dest = run(ingest._stage_file(src, _tmp_datadir / "digests", ""))
|
||||||
assert dest.parent == _tmp_datadir / "digests" / "other"
|
assert dest.parent == _tmp_datadir / "digests" / "other"
|
||||||
assert dest.exists()
|
assert dest.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_thumbnail_renderer_routes_through_storage(_tmp_datadir):
|
||||||
|
"""extractor.render_pages_for_multimodal (a sync renderer) now persists the
|
||||||
|
JPEG thumbnail via the sync storage facade — under filesystem it must land
|
||||||
|
at the requested thumbnail_dir."""
|
||||||
|
fitz = pytest.importorskip("fitz")
|
||||||
|
from legal_mcp.services import extractor
|
||||||
|
|
||||||
|
pdf = _tmp_datadir / "doc.pdf"
|
||||||
|
d = fitz.open()
|
||||||
|
d.new_page(width=200, height=200)
|
||||||
|
d.save(str(pdf))
|
||||||
|
d.close()
|
||||||
|
|
||||||
|
thumb_dir = _tmp_datadir / "cases" / "1" / "thumbnails" / "docid"
|
||||||
|
out = extractor.render_pages_for_multimodal(pdf, embed_dpi=72, thumb_dpi=36,
|
||||||
|
thumbnail_dir=thumb_dir)
|
||||||
|
assert len(out) == 1
|
||||||
|
_img, thumb_path = out[0]
|
||||||
|
assert thumb_path == thumb_dir / "p001.jpg"
|
||||||
|
assert thumb_path.exists() # written through storage.put_bytes_sync (DERIVED)
|
||||||
|
assert thumb_path.read_bytes()[:2] == b"\xff\xd8" # JPEG magic
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_bytes_sync_roundtrip(_tmp_datadir):
|
||||||
|
src_key = "cases/1/exports/x.docx"
|
||||||
|
storage.put_bytes_sync(src_key, b"PK\x03\x04zip", bucket=storage.Bucket.DOCUMENTS)
|
||||||
|
assert (_tmp_datadir / src_key).read_bytes() == b"PK\x03\x04zip"
|
||||||
|
|||||||
@@ -99,8 +99,9 @@ ENV_CATALOG: dict[str, EnvSpec] = {
|
|||||||
),
|
),
|
||||||
"MINIO_ENDPOINT": EnvSpec(
|
"MINIO_ENDPOINT": EnvSpec(
|
||||||
"MINIO_ENDPOINT", "storage", "string",
|
"MINIO_ENDPOINT", "storage", "string",
|
||||||
"endpoint פנימי של MinIO (server-side, רשת Docker)",
|
"endpoint פנימי של MinIO (server-side, רשת Docker coolify — שם-קונטיינר עמיד)",
|
||||||
is_secret=False, is_editable=False, default="http://minio:9000",
|
is_secret=False, is_editable=False,
|
||||||
|
default="http://minio-bx2ykvw94xbutsex41hz4vv8:9000",
|
||||||
),
|
),
|
||||||
"MINIO_PUBLIC_ENDPOINT": EnvSpec(
|
"MINIO_PUBLIC_ENDPOINT": EnvSpec(
|
||||||
"MINIO_PUBLIC_ENDPOINT", "storage", "string",
|
"MINIO_PUBLIC_ENDPOINT", "storage", "string",
|
||||||
|
|||||||
Reference in New Issue
Block a user