diff --git a/mcp-server/src/legal_mcp/services/block_writer.py b/mcp-server/src/legal_mcp/services/block_writer.py index ce6ac5c..b44c459 100644 --- a/mcp-server/src/legal_mcp/services/block_writer.py +++ b/mcp-server/src/legal_mcp/services/block_writer.py @@ -18,8 +18,10 @@ import re from datetime import date from uuid import UUID +from pathlib import Path + from legal_mcp import config -from legal_mcp.services import db, embeddings, claude_session, audit +from legal_mcp.services import db, embeddings, claude_session, audit, storage from legal_mcp.services.lessons import ( OUTCOME_LABELS_HE, PRACTICE_AREA_OVERRIDES, @@ -1119,7 +1121,13 @@ async def _update_draft_file(decision_id: UUID) -> None: draft_dir = config.find_case_dir(case_row["case_number"]) / "drafts" draft_dir.mkdir(parents=True, exist_ok=True) draft_path = draft_dir / "decision.md" - draft_path.write_text("\n\n".join(row["content"] for row in rows if row["content"]), encoding="utf-8") + draft_text = "\n\n".join(row["content"] for row in rows if row["content"]) + draft_path.write_text(draft_text, encoding="utf-8") # noqa: STG1 — sealed below + try: + _dkey = draft_path.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix() + await storage.mirror(_dkey, draft_text.encode("utf-8"), bucket=storage.Bucket.DOCUMENTS) + except ValueError: + pass logger.info("Draft file synced: %s (%d blocks)", draft_path, len(rows)) diff --git a/mcp-server/src/legal_mcp/services/docx_exporter.py b/mcp-server/src/legal_mcp/services/docx_exporter.py index a4466c8..48d6c94 100644 --- a/mcp-server/src/legal_mcp/services/docx_exporter.py +++ b/mcp-server/src/legal_mcp/services/docx_exporter.py @@ -487,7 +487,7 @@ async def export_decision( await storage.put_bytes(key, data, bucket=storage.Bucket.DOCUMENTS, content_type=_docx_ctype) except ValueError: Path(output_path).parent.mkdir(parents=True, exist_ok=True) - Path(output_path).write_bytes(data) + Path(output_path).write_bytes(data) # noqa: STG1 — storage fallback (output_path outside DATA_DIR) logger.info("DOCX exported (mode=%s): %s", mode, output_path) return output_path diff --git a/mcp-server/src/legal_mcp/services/docx_retrofit.py b/mcp-server/src/legal_mcp/services/docx_retrofit.py index c3e1116..e585f8d 100644 --- a/mcp-server/src/legal_mcp/services/docx_retrofit.py +++ b/mcp-server/src/legal_mcp/services/docx_retrofit.py @@ -317,7 +317,7 @@ def retrofit_bookmarks( docx_path, _bkey, bucket=storage.Bucket.DOCUMENTS, content_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document") except ValueError: - shutil.copy2(str(docx_path), str(backup_path)) + shutil.copy2(str(docx_path), str(backup_path)) # noqa: STG1 — storage fallback # Inject bookmarks, skipping any that already exist next_id = _next_bookmark_id(doc_tree) diff --git a/mcp-server/src/legal_mcp/services/docx_reviser.py b/mcp-server/src/legal_mcp/services/docx_reviser.py index ae68bbb..ee79d73 100644 --- a/mcp-server/src/legal_mcp/services/docx_reviser.py +++ b/mcp-server/src/legal_mcp/services/docx_reviser.py @@ -114,7 +114,7 @@ def _persist_docx_sync(output_path: Path, data: bytes) -> None: content_type=_DOCX_CTYPE) except ValueError: out.parent.mkdir(parents=True, exist_ok=True) - out.write_bytes(data) + out.write_bytes(data) # noqa: STG1 — storage fallback def _save_docx_xml( @@ -536,4 +536,4 @@ def copy_with_revisions( content_type=_DOCX_CTYPE) except ValueError: out.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(str(source_path), str(out)) + shutil.copy2(str(source_path), str(out)) # noqa: STG1 — storage fallback diff --git a/mcp-server/src/legal_mcp/services/research_md.py b/mcp-server/src/legal_mcp/services/research_md.py index d7963c3..64e7ec2 100644 --- a/mcp-server/src/legal_mcp/services/research_md.py +++ b/mcp-server/src/legal_mcp/services/research_md.py @@ -346,7 +346,7 @@ def update_chair_position( # Atomic write tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") - tmp_path.write_text(new_content, encoding="utf-8") + tmp_path.write_text(new_content, encoding="utf-8") # noqa: STG1 — atomic .tmp; in-place edit, S3 re-sync in Phase-2 read-wiring os.replace(tmp_path, file_path) preview = new_text.strip()[:120] diff --git a/mcp-server/src/legal_mcp/services/storage.py b/mcp-server/src/legal_mcp/services/storage.py index 05e1a2e..38d577b 100644 --- a/mcp-server/src/legal_mcp/services/storage.py +++ b/mcp-server/src/legal_mcp/services/storage.py @@ -473,6 +473,43 @@ async def ensure_local(key, *, bucket=Bucket.DOCUMENTS) -> Path: return await get_storage().ensure_local(key, bucket=bucket) +# ── mirror: dual-write seal for the not-yet-read-wired pipeline (INV-STG1) ────── +# A handful of upload/finalize paths still keep a copy on disk because the +# ingest/extract pipeline reads files by their DATA_DIR path (not yet wired to +# ensure_local). For those, ``mirror``/``mirror_file`` ALSO persist the blob to +# object storage when the active backend is s3/dual — so no blob is ever missing +# from MinIO (durability + presigned serving) even though a disk copy lingers +# for the pipeline. No-op under the filesystem backend (the disk write is the +# canonical copy). Best-effort: an S3 failure is logged, never breaks the +# request (the disk copy holds). The full fix (read-wire the pipeline → drop the +# disk copy) is tracked separately; until then this closes the data-loss leak. + +async def mirror(key, data, *, bucket=Bucket.DOCUMENTS, + content_type=None, metadata=None) -> None: + backend = get_storage() + if backend.name == "filesystem": + return + s3 = getattr(backend, "s3", backend) + try: + await s3.put_bytes(key, data, bucket=bucket, + content_type=content_type, metadata=metadata) + except Exception as exc: # noqa: BLE001 — log, never break the request + logger.warning("storage.mirror: S3 persist failed for %s: %s", key, exc) + + +async def mirror_file(src, key, *, bucket=Bucket.DOCUMENTS, + content_type=None, metadata=None) -> None: + backend = get_storage() + if backend.name == "filesystem": + return + s3 = getattr(backend, "s3", backend) + try: + await s3.put_file(src, key, bucket=bucket, + content_type=content_type, metadata=metadata) + except Exception as exc: # noqa: BLE001 + logger.warning("storage.mirror_file: S3 persist failed for %s: %s", key, exc) + + # ── synchronous facade ───────────────────────────────────────────── # A few legacy writers are plain sync functions (track-changes save, retrofit # backup, the multimodal thumbnail renderer which runs in a worker thread via @@ -511,3 +548,15 @@ def put_file_sync(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, metadata=None) -> str: return _run_coro_blocking( put_file(src, key, bucket=bucket, content_type=content_type, metadata=metadata)) + + +def mirror_sync(key, data, *, bucket=Bucket.DOCUMENTS, content_type=None, + metadata=None) -> None: + _run_coro_blocking(mirror(key, data, bucket=bucket, + content_type=content_type, metadata=metadata)) + + +def mirror_file_sync(src, key, *, bucket=Bucket.DOCUMENTS, content_type=None, + metadata=None) -> None: + _run_coro_blocking(mirror_file(src, key, bucket=bucket, + content_type=content_type, metadata=metadata)) diff --git a/mcp-server/tests/test_storage_mirror.py b/mcp-server/tests/test_storage_mirror.py new file mode 100644 index 0000000..d9e9ce4 --- /dev/null +++ b/mcp-server/tests/test_storage_mirror.py @@ -0,0 +1,74 @@ +"""Tests for storage.mirror — the INV-STG1 dual-write seal. + +mirror() must: be a no-op under the filesystem backend (the disk write is +canonical), persist to the S3 sub-backend under s3/dual, and never raise (an S3 +failure is logged, the request proceeds on the disk copy). Offline. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from legal_mcp.services import storage + + +class _FakeS3: + def __init__(self, fail: bool = False) -> None: + self.fail = fail + self.puts: list[tuple] = [] + + async def put_bytes(self, key, data, *, bucket, content_type=None, metadata=None): + if self.fail: + raise RuntimeError("s3 down") + self.puts.append((key, bytes(data), bucket)) + return f"s3://{bucket}/{key}" + + +class _FakeFilesystem: + name = "filesystem" + + +class _FakeDual: + name = "dual" + + def __init__(self, s3: _FakeS3) -> None: + self.s3 = s3 + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +def test_mirror_noop_under_filesystem(monkeypatch): + monkeypatch.setattr(storage, "get_storage", lambda: _FakeFilesystem()) + # must not raise and must not attempt any S3 work + _run(storage.mirror("cases/x/y.pdf", b"data", bucket=storage.Bucket.DOCUMENTS)) + + +def test_mirror_persists_under_dual(monkeypatch): + s3 = _FakeS3() + monkeypatch.setattr(storage, "get_storage", lambda: _FakeDual(s3)) + _run(storage.mirror("cases/x/y.pdf", b"data", bucket=storage.Bucket.DOCUMENTS)) + assert s3.puts == [("cases/x/y.pdf", b"data", storage.Bucket.DOCUMENTS)] + + +def test_mirror_best_effort_never_raises(monkeypatch): + s3 = _FakeS3(fail=True) + monkeypatch.setattr(storage, "get_storage", lambda: _FakeDual(s3)) + # S3 failure must be swallowed-with-log, never propagate (disk copy holds) + _run(storage.mirror("cases/x/y.pdf", b"data", bucket=storage.Bucket.DOCUMENTS)) + + +def test_mirror_uses_backend_itself_when_no_s3_attr(monkeypatch): + # a pure s3 backend has no .s3 sub-attr → getattr falls back to the backend + s3 = _FakeS3() + s3.name = "s3" + monkeypatch.setattr(storage, "get_storage", lambda: s3) + _run(storage.mirror("k", b"d", bucket=storage.Bucket.DERIVED)) + assert s3.puts == [("k", b"d", storage.Bucket.DERIVED)] diff --git a/mcp-server/tests/test_storage_write_leak_guard.py b/mcp-server/tests/test_storage_write_leak_guard.py new file mode 100644 index 0000000..7f3af66 --- /dev/null +++ b/mcp-server/tests/test_storage_write_leak_guard.py @@ -0,0 +1,62 @@ +"""INV-STG1 leak-guard — no blob may be written to disk without going through, or +being mirrored to, the storage layer (services/storage.py). + +After the cutover to ``STORAGE_BACKEND=s3`` a direct disk write under DATA_DIR +that bypasses storage creates an orphan: a file in the old folders that never +reaches MinIO (lost on cleanup, not served, not backed up). This static guard +fails CI on any NEW direct blob-write (``write_bytes``/``write_text``/ +``shutil.copy*``/``shutil.move``/``open(...,'wb')``) in the web API or services +that is not explicitly acknowledged with a ``# noqa: STG1`` marker. + +Marking a line means the author has CONSCIOUSLY handled it — either sealed it +(``_seal_blob`` / ``storage.mirror`` right after, for paths the disk-based +pipeline still reads) or justified it as benign (temp file, staging-then-unlink, +git-per-case metadata, log/flag, BytesIO buffer, storage fallback). New +unmarked writes block the build until the author does the same. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[2] +# storage.py is the storage layer itself — its disk writes ARE the implementation. +_EXCLUDE = {"storage.py"} +_SCAN = [ + _ROOT / "web" / "app.py", + *(p for p in sorted((_ROOT / "mcp-server" / "src" / "legal_mcp" / "services").glob("*.py")) + if p.name not in _EXCLUDE), +] + +# Direct-disk-write patterns that could land a blob in the old folders. +_PATTERNS = re.compile( + r"\.write_bytes\(|\.write_text\(|shutil\.copy2?\(|shutil\.move\(|open\([^)]*,\s*['\"][wax]b?['\"]" +) +_MARKER = "noqa: STG1" + + +def _violations() -> list[str]: + out: list[str] = [] + for f in _SCAN: + if not f.exists(): + continue + for i, line in enumerate(f.read_text(encoding="utf-8").splitlines(), 1): + s = line.strip() + if s.startswith("#"): + continue + if _PATTERNS.search(line) and _MARKER not in line: + out.append(f"{f.relative_to(_ROOT)}:{i}: {s[:100]}") + return out + + +def test_no_unmarked_blob_disk_writes(): + violations = _violations() + assert not violations, ( + "INV-STG1: direct blob-disk-write(s) without a `# noqa: STG1` marker — " + "seal each via `_seal_blob`/`storage.mirror` (if the pipeline reads the " + "disk path) or justify it as benign on the line:\n " + + "\n ".join(violations) + ) diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 8c01503..bc76be3 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -48,6 +48,7 @@ | `backfill_nevo_preamble.py` | python | **#86.2** — מיגרציית-נתונים: חיתוך preamble/רציו של נבו שדלף לפסיקה שהוטמעה לפני תיקון #86.1. מאתר כל `case_law` ש-`strip_nevo_preamble(full_text)` עדיין מקצר (דליפה היסטורית), ומבצע: (1) לכידת ה-מיני-רציו ל-`case_law.nevo_ratio` (gold-set ל-#86.3); (2) שכתוב `full_text` החתוך + חישוב-מחדש של `content_hash`; (3) `reindex_case_law` (re-chunk+embed, ללא re-OCR/LLM); (4) **סימון (לא מחיקה)** הלכות ש-`supporting_quote` שלהן בתוך ה-preamble שהוסר → `pending_review` + quality_flag `nevo_preamble_leak`. **שומר-בטיחות:** שורות עם keep%<`--min-keep` (ברירת-מחדל 60) מוחרגות מ-`--apply` כחשד over-strip (אלא אם `--include-suspicious`). **dry-run כברירת-מחדל**; `--apply` כותב backup JSON + manifest CSV ל-`data/audit/` תחילה. idempotent. רץ עם venv של mcp-server. **chair-gated** (לאמת manifest לפני apply) | מיגרציית-נתונים — dry-run בוצע (19 פסקים, 27 הלכות מזוהמות); apply ממתין לאישור | | `nevo_ratio_benchmark.py` | python | **#86.3** — מדידת איכות חילוץ-הלכות מול ה-מיני-רציו של נבו (gold-set מקצועי חינמי). לכל פסק עם `nevo_ratio` (או נגזר מ-`full_text` אם טרם בוצע backfill): LLM-judge מקומי (`claude_session`, אפס עלות) ממפה סמנטית את הלכות-המערכת מול הלכות-נבו ומפיק **recall** (כיסוי הלכות-נבו), **precision** (אחוז הלכותינו הממופות), **granularity** (יחס פירוק — איתות over-extraction ל-#81.5). `--case ` / `--all [--limit N]` / `--model` / `--out`. כותב CSV ל-`data/audit/`. רץ עם venv של mcp-server (דורש Claude CLI מקומי). אומת על בג"ץ 1764/05: recall 0.875, precision 1.0, granularity 1.75x | ידני — מדידת-איכות (CI/ad-hoc) | | `migrate_blobs_to_minio.py` | python | **#106.4 — הגירת בלובים לדיסק→MinIO (DB-driven, dry-run-default).** סורק 6 עמודות-נתיב (documents.file_path · cases.active_draft_path · digests.source_document_path · draft_final_pairs.final_path · *_image_embeddings.image_thumbnail_path), מנרמל 3 פורמטי-נתיב legacy (container-abs `/data/`, host-abs, relative) ל-key יחסי-DATA_DIR, וגוזר bucket per-file-semantic (מסמך→documents, thumbnail→derived). dry-run מפיק תוכנית+מניפסט CSV (data/audit) + מדווח חסרים; `--apply` מעלה דרך mcli ומאמת size (דיסק לא נוגע → הפיך). אומת 2026-06-11: 3404 קבצים/899MB, 0 outside, 28 חסרים. **חובה mcli alias legalminio**. | ידני — הגירת-אחסון X14 | +| `storage_leak_tripwire.py` | python | **INV-STG1 tripwire (ניטור-ריצה).** משלים את ה-CI leak-guard: סורק בלובים ב-data/{cases,precedent-library,internal-decisions,digests,training} ומשווה מול ה-key-sets החיים של legal-documents/legal-derived (json-key match, סיווג bucket per-file כמו בהגירה). מדווח בלובים שדלפו (בדיסק אך לא ב-MinIO → יאבדו בניקוי, לא מוגשים/מגובים). read-only, `--since `. אומת: 0 דליפות. **חובה מקומי** (mcli legalminio). | תקופתי / לפני ניקוי-דיסק #128 | | `nevo_corpus_audit.py` | python | **#86.2/#86.3 — אודיט קורפוס-נבו (read-only).** `leak` סורק chunks+הלכות למרקרי-preamble של נבו (מיובאים מ-extractor._NEVO_MARKERS), מבחין בין הווקטור המזיק (מרקר בתוך הלכה=רציו-עריכה כהלכה) ל-benign (רשימת-ציטוטים), ומפיק CSV. אומת 2026-06-11: **0 הלכות מזוהמות** (שכבת-הידע נקייה) → אין purge/re-ingest (גם נוגד no-reocr). `leak --apply` מבצע backfill **אדיטיבי** של `case_law.nevo_ratio` מטקסט שמור (extract_nevo_ratio, ללא re-OCR) — captured 16→32. `benchmark` משווה הלכות-שלנו מול ה-מיני-רציו דרך הפאנל התלת-מודלי → recall כיסוי (1110-20: 13 הלכות, recall=1.0). **חובה מקומי** (benchmark). | ידני — ניטור-זיהום / ground-truth | | `halacha_goldset.py` | python | **#81.7** — הארנס gold-set לאיכות חילוץ-הלכות. `export --n N` מייצא מדגם מרובד (לפי precedent×rule_type) ל-CSV עם עמודות-תיוג ריקות (`is_holding`/`correct_type`/`quote_complete`) לתיוג ידני (חיים/דפנה). `score --in ` קורא את ה-CSV המתויג ומודד כל ולידטור (`compute_quality_flags`/`is_fact_dependent`/`is_quote_truncated`/`is_thin_restatement`) מול אמת-המידה האנושית: P/R/F1 + confusion. בסיס ל-#81.8 (כיול סף האישור). מייבא את אותם ולידטורים שה-extractor מריץ. רץ עם venv של mcp-server. **הערה:** קיים גם דף-תיוג אינטראקטיבי DB-backed (`/goldset`) — זה ה-CSV-fallback | ידני — export→תיוג→score | | `goldset_panel_label.py` | python | **#81.7 — תיוג ה-gold-set בקונצנזוס תלת-מודלי (ללא man-in-the-loop, הנחיית-יו"ר 2026-06-11).** מריץ את שלושת השופטים העצמאיים (Opus/claude_session · DeepSeek · Gemini, מיובאים מ-`halacha_panel_approve`) עם ה-prompt העשיר (`is_holding`+`type`+נימוק מ-`goldset_ai_recommend`) על כל פריט; **רוב 2/3 נכתב ל-`is_holding`/`correct_type`** עם `tagged_by='panel:opus+deepseek+gemini'` (פיצול→NULL→יו"ר, INV-G10). מודד **Fleiss κ** (3 מעריכים) ומריץ **מבחן-אנונימיזציה** (שמות-תיק ממוסכים→שיפוט-מחדש; flip=שינון). לא מעגלי — הוולידטורים הנמדדים rule-based. כותב per-model+consensus+anon ל-DB ודוח ל-`data/audit/`. **מחליף** תיוג-ידני; `goldset_ai_recommend`/`goldset_independent_judge` נשארים כבדיקות single-model. `--limit`/`--no-anon`/`--force`. **חובה מקומי**. | ידני — לאחר יצירת/הרחבת batch | diff --git a/scripts/storage_leak_tripwire.py b/scripts/storage_leak_tripwire.py new file mode 100644 index 0000000..3b6ee8c --- /dev/null +++ b/scripts/storage_leak_tripwire.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +"""INV-STG1 runtime tripwire — detect blobs that leaked to the old disk folders +without reaching MinIO (the detective control complementing the CI leak-guard). + +After the s3-only cutover, every blob written under DATA_DIR/{cases, +precedent-library,internal-decisions,digests,training} should ALSO be in MinIO +(the upload/finalize paths keep a disk copy for the pipeline but mirror to S3 via +storage.mirror — see web/app.py _seal_blob). A file present on disk but ABSENT +from the matching S3 bucket means a write bypassed the seal → it would be lost on +disk cleanup and is not served/backed-up. This script reports them. + +Classifies disk files into documents/derived buckets exactly like the migration +(``*/extracted/*`` and ``*/thumbnails/*`` → legal-derived; the rest → legal- +documents) and compares against the live bucket key-sets (proper JSON key match, +so Hebrew filenames with spaces compare correctly). Read-only. + +Run locally (needs the `legalminio` mcli alias): + python3 scripts/storage_leak_tripwire.py # full scan + python3 scripts/storage_leak_tripwire.py --since 2026-06-11 # only newer files +""" +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from pathlib import Path + +MCLI = str(Path.home() / ".local" / "bin" / "mcli") +DATA = Path("/home/chaim/legal-ai/data") +CATS = ["cases", "precedent-library", "internal-decisions", "digests", "training"] +# non-blob disk files that legitimately stay on disk / in git-per-case +SKIP_SUFFIX = {".tmp", ".log"} +SKIP_NAME = {"case.json", "notes.md", ".pull.log"} + + +def _bucket_for(rel: str) -> str: + return ("legal-derived" if ("/extracted/" in rel or "/thumbnails/" in rel) + else "legal-documents") + + +def _s3_keys(bucket: str) -> set[str]: + out = subprocess.run([MCLI, "ls", "--recursive", "--json", f"legalminio/{bucket}"], + capture_output=True, text=True, env={"TERM": "xterm", "HOME": str(Path.home())}) + keys: set[str] = set() + for ln in out.stdout.splitlines(): + try: + k = json.loads(ln).get("key", "") + except json.JSONDecodeError: + continue + if k and "/.git/" not in k: + keys.add(k) + return keys + + +def main(args) -> int: + s3 = {b: _s3_keys(b) for b in ("legal-documents", "legal-derived")} + since = None + if args.since: + import datetime + since = datetime.datetime.fromisoformat(args.since).timestamp() + + leaked: list[str] = [] + scanned = 0 + for cat in CATS: + root = DATA / cat + if not root.exists(): + continue + for f in root.rglob("*"): + if not f.is_file() or "/.git/" in f.as_posix(): + continue + if f.suffix in SKIP_SUFFIX or f.name in SKIP_NAME: + continue + if since and f.stat().st_mtime < since: + continue + rel = f.relative_to(DATA).as_posix() + scanned += 1 + if rel not in s3[_bucket_for(rel)]: + leaked.append(rel) + + print(f"scanned {scanned} disk blobs across {CATS}") + if not leaked: + print("✓ no leaks — every disk blob is present in MinIO.") + return 0 + print(f"⚠ {len(leaked)} LEAKED blobs (on disk, NOT in MinIO):") + for r in leaked[:50]: + print(f" {r} → expected in {_bucket_for(r)}") + if len(leaked) > 50: + print(f" … and {len(leaked) - 50} more") + return 1 + + +if __name__ == "__main__": + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--since", help="ISO date — only check files modified on/after") + sys.exit(main(ap.parse_args())) diff --git a/web/app.py b/web/app.py index 3446708..491e3ed 100644 --- a/web/app.py +++ b/web/app.py @@ -146,7 +146,8 @@ async def upload_file(file: UploadFile = File(...)): raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB") dest = UPLOAD_DIR / filename - dest.write_bytes(content) + dest.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(dest, content) return { "filename": filename, @@ -301,12 +302,14 @@ async def _process_proofread_training( # Copy original to training dir original_name = re.sub(r"^\d+_", "", source.name) orig_dest = training_dir / original_name - shutil.copy2(str(source), str(orig_dest)) + shutil.copy2(str(source), str(orig_dest)) # noqa: STG1 — sealed below + await _seal_blob_file(orig_dest) # Save cleaned version proofread_name = Path(original_name).stem + ".md" proofread_dest = proofread_dir / proofread_name - proofread_dest.write_text(clean_text, encoding="utf-8") + proofread_dest.write_text(clean_text, encoding="utf-8") # noqa: STG1 — sealed below + await _seal_blob(proofread_dest, clean_text.encode("utf-8")) # 3. Parse date d_date = None @@ -1405,7 +1408,7 @@ async def create_curator_proposal(body: CuratorProposal): f"## נימוק\n\n{body.rationale.strip() or '(לא ניתן)'}\n" ) try: - path.write_text(md, encoding="utf-8") + path.write_text(md, encoding="utf-8") # noqa: STG1 — curator proposal state, not a corpus blob except OSError as e: raise HTTPException(500, f"failed to write proposal: {e}") return { @@ -2846,6 +2849,31 @@ async def serve_blob( return FileResponse(path, media_type=media_type, filename=filename) +async def _seal_blob(dest: Path, content: bytes, + *, bucket=storage.Bucket.DOCUMENTS) -> None: + """Mirror a just-written disk blob to object storage (INV-STG1 seal). + + The ingest/extract pipeline still reads ``dest`` from its DATA_DIR path, so + these endpoints keep the disk copy; this ALSO persists the blob to MinIO so + nothing written to the old folders is ever missing from object storage + (durability + presigned serving). No-op under the filesystem backend; + best-effort under s3/dual (logged, never breaks the request).""" + try: + key = dest.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix() + except ValueError: + return # outside DATA_DIR → not a managed blob + await storage.mirror(key, content, bucket=bucket) + + +async def _seal_blob_file(dest: Path, *, bucket=storage.Bucket.DOCUMENTS) -> None: + """``_seal_blob`` for a file already on disk (e.g. after shutil.copy).""" + try: + key = dest.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix() + except ValueError: + return + await storage.mirror_file(dest, key, bucket=bucket) + + @app.get("/api/cases/{case_number}/local-files/{folder}/{filename}") async def api_read_local_file(case_number: str, folder: str, filename: str): """Read contents of a local case file.""" @@ -2952,7 +2980,7 @@ async def api_research_analysis_upload( tmp = dest.with_suffix(".md.upload-tmp") try: dest.parent.mkdir(parents=True, exist_ok=True) - tmp.write_text(text, encoding="utf-8") + tmp.write_text(text, encoding="utf-8") # noqa: STG1 — atomic upload .tmp, replaced below parsed = research_md.parse(tmp) except Exception as e: tmp.unlink(missing_ok=True) @@ -2986,7 +3014,8 @@ async def api_research_analysis_upload( backup_dir.mkdir(exist_ok=True) ts = time.strftime("%Y%m%d-%H%M%S") backup_path = backup_dir / f"analysis-and-research-{ts}.md" - shutil.copy2(dest, backup_path) + shutil.copy2(dest, backup_path) # noqa: STG1 — sealed below + await _seal_blob_file(backup_path) # Replace with uploaded file tmp.replace(dest) @@ -3096,7 +3125,8 @@ async def api_precedent_upload_pdf( while dest.exists(): dest = case_dir / f"{safe_name or 'precedent'}-{counter}{ext}" counter += 1 - dest.write_bytes(content) + dest.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(dest, content) case_id = UUID(case["id"]) doc = await db.create_document( @@ -3227,7 +3257,8 @@ async def api_upload_export(case_number: str, file: UploadFile = File(...)): pass dest = export_dir / f"עריכה-v{next_ver}.docx" - dest.write_bytes(content) + dest.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(dest, content) # Auto-register as active_draft + retrofit bookmarks auto_result: dict = {"status": "ok"} @@ -3382,12 +3413,14 @@ async def api_mark_final(case_number: str, filename: str): # Rename/copy to final final_name = f"סופי-{case_number}.docx" final_path = export_dir / final_name - shutil.copy2(str(source), str(final_path)) + shutil.copy2(str(source), str(final_path)) # noqa: STG1 — sealed below + await _seal_blob_file(final_path) # Also copy to training directory for future style learning config.TRAINING_DIR.mkdir(parents=True, exist_ok=True) training_dest = config.TRAINING_DIR / f"החלטה-{case_number}.docx" - shutil.copy2(str(source), str(training_dest)) + shutil.copy2(str(source), str(training_dest)) # noqa: STG1 — sealed below + await _seal_blob_file(training_dest) # Update case status to final pool = await db.get_pool() @@ -3635,13 +3668,15 @@ async def api_upload_final_decision(case_number: str, file: UploadFile = File(.. export_dir.mkdir(parents=True, exist_ok=True) final_name = f"סופי-{case_number}.docx" final_path = export_dir / final_name - final_path.write_bytes(content) + final_path.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(final_path, content) # Enroll in the style corpus. Use the FULL case_number as decision_number so a # בל"מ never collides with a same-numbered ערר already in the corpus (e.g. ARAR-25-8126). config.TRAINING_DIR.mkdir(parents=True, exist_ok=True) training_dest = config.TRAINING_DIR / f"החלטה-{case_number}.docx" - shutil.copy2(str(final_path), str(training_dest)) + shutil.copy2(str(final_path), str(training_dest)) # noqa: STG1 — sealed below + await _seal_blob_file(training_dest) # Extract the final text (word count for the UI; full text snapshotted into the pair). final_text = "" @@ -4977,7 +5012,7 @@ async def api_install_skill(file: UploadFile = File(...)): dest = skill_dir / rel_path dest.parent.mkdir(parents=True, exist_ok=True) - dest.write_bytes(zf.read(name)) + dest.write_bytes(zf.read(name)) # noqa: STG1 — extracts to ~/.paperclip skills (outside DATA_DIR) extracted_files.append(rel_path) zf.close() @@ -5149,7 +5184,7 @@ async def api_restart_paperclip(): # Fallback: write a flag file that host-side watcher picks up flag_file = PAPERCLIP_SKILLS_DIR / ".restart-requested" try: - flag_file.write_text(str(time.time())) + flag_file.write_text(str(time.time())) # noqa: STG1 — restart flag (state) return { "status": "restart_requested", "method": "flag_file", @@ -5202,7 +5237,8 @@ async def api_upload_tagged_document( dest = case_dir / f"{stem}-{counter}{ext}" counter += 1 - dest.write_bytes(content) + dest.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(dest, content) # Create document record case_id = UUID(case["id"]) @@ -5742,7 +5778,8 @@ async def _process_case_document(task_id: str, source: Path, req: ClassifyReques # Use original name without timestamp prefix original_name = re.sub(r"^\d+_", "", source.name) dest = case_dir / original_name - shutil.copy2(str(source), str(dest)) + shutil.copy2(str(source), str(dest)) # noqa: STG1 — sealed below + await _seal_blob_file(dest) # Create document record await _progress.set(task_id, {"status": "registering", "filename": req.filename}) @@ -5792,7 +5829,8 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe config.TRAINING_DIR.mkdir(parents=True, exist_ok=True) original_name = re.sub(r"^\d+_", "", source.name) dest = config.TRAINING_DIR / original_name - shutil.copy2(str(source), str(dest)) + shutil.copy2(str(source), str(dest)) # noqa: STG1 — sealed below + await _seal_blob_file(dest) # Extract text await _progress.set(task_id, {"status": "processing", "filename": req.filename, "step": "extracting"}) @@ -6865,7 +6903,8 @@ async def bulletin_upload(file: UploadFile = File(...)): # Idempotent: same content (any filename) already staged → skip. if any(p.name.startswith(f"{digest}_") for p in _BULLETINS_DIR.glob(f"{digest}_*")): return {"status": "exists", "filename": dest.name, "size": len(content)} - dest.write_bytes(content) + dest.write_bytes(content) # noqa: STG1 — sealed below + await _seal_blob(dest, content) return {"status": "stored", "filename": dest.name, "size": len(content)}