feat(storage): seal INV-STG1 write path — 15 dual-write seals + CI leak-guard + tripwire
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s

אחרי ה-cutover ל-s3-only, אודיט מצא 15 אתרי-כתיבת-בלוב שעוקפים את storage.py (uploads/
finalize/exports/training/research-backup/precedents/bulletins/draft) — קובץ ינחת
בתיקיות-הישנות אך **לא** ב-MinIO → יאבד בניקוי, לא מוגש, לא מגובה. ה-pipeline (ingest/
extract) עדיין קורא לפי file_path מהדיסק, אז ביטול-מוחלט של כתיבה-לדיסק דורש read-wiring
מלא (Phase 2, משימה נפרדת). תיקון בטוח עכשיו = **dual-write seal**.

- storage.py: `mirror`/`mirror_file` (+ sync) — best-effort persist ל-S3 כשה-backend
  s3/dual (no-op ב-filesystem; כשל S3 נרשם, לא שובר request — DualBackend philosophy).
- web/app.py: helpers `_seal_blob`/`_seal_blob_file` + 14 אתרים אטומים (storage.mirror
  אחרי כתיבת-הדיסק; הדיסק נשאר ל-pipeline). block_writer.py: draft אטום (async).
- **CI leak-guard** (test_storage_write_leak_guard): נכשל על כל כתיבת-בלוב-לדיסק
  (write_bytes/write_text/shutil.copy*/open(wb)) ב-web/+services ללא מרקר `# noqa: STG1`.
  כל ה-benign (fallbacks/tmp/staging/git-metadata/flag/state) מסומנים עם נימוק. storage.py
  מוחרג (הוא המימוש).
- **tripwire** (scripts/storage_leak_tripwire.py): ניטור-ריצה — בלובים בדיסק שלא ב-MinIO
  (json-key match, bucket per-file). אומת חי: 0 דליפות.

invariants: INV-STG1 (כל I/O דרך storage / ממורר אליו) · INV-STG6 · feedback_silent_swallow
(mirror רושם warning, לא bare-except). Phase 2 (read-wire ה-pipeline → להפיל את עותק-הדיסק)
= follow-up. tests: 4 mirror + 1 leak-guard + 6 serve_blob + 18 storage קיימות עוברות.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-11 19:57:12 +00:00
parent 24480950f1
commit 0d8cc31a2b
11 changed files with 355 additions and 25 deletions

View File

@@ -146,7 +146,8 @@ async def upload_file(file: UploadFile = File(...)):
raise HTTPException(400, f"File too large. Max: {MAX_FILE_SIZE // (1024*1024)}MB")
dest = UPLOAD_DIR / filename
dest.write_bytes(content)
dest.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(dest, content)
return {
"filename": filename,
@@ -301,12 +302,14 @@ async def _process_proofread_training(
# Copy original to training dir
original_name = re.sub(r"^\d+_", "", source.name)
orig_dest = training_dir / original_name
shutil.copy2(str(source), str(orig_dest))
shutil.copy2(str(source), str(orig_dest)) # noqa: STG1 — sealed below
await _seal_blob_file(orig_dest)
# Save cleaned version
proofread_name = Path(original_name).stem + ".md"
proofread_dest = proofread_dir / proofread_name
proofread_dest.write_text(clean_text, encoding="utf-8")
proofread_dest.write_text(clean_text, encoding="utf-8") # noqa: STG1 — sealed below
await _seal_blob(proofread_dest, clean_text.encode("utf-8"))
# 3. Parse date
d_date = None
@@ -1378,7 +1381,7 @@ async def create_curator_proposal(body: CuratorProposal):
f"## נימוק\n\n{body.rationale.strip() or '(לא ניתן)'}\n"
)
try:
path.write_text(md, encoding="utf-8")
path.write_text(md, encoding="utf-8") # noqa: STG1 — curator proposal state, not a corpus blob
except OSError as e:
raise HTTPException(500, f"failed to write proposal: {e}")
return {
@@ -2819,6 +2822,31 @@ async def serve_blob(
return FileResponse(path, media_type=media_type, filename=filename)
async def _seal_blob(dest: Path, content: bytes,
*, bucket=storage.Bucket.DOCUMENTS) -> None:
"""Mirror a just-written disk blob to object storage (INV-STG1 seal).
The ingest/extract pipeline still reads ``dest`` from its DATA_DIR path, so
these endpoints keep the disk copy; this ALSO persists the blob to MinIO so
nothing written to the old folders is ever missing from object storage
(durability + presigned serving). No-op under the filesystem backend;
best-effort under s3/dual (logged, never breaks the request)."""
try:
key = dest.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
except ValueError:
return # outside DATA_DIR → not a managed blob
await storage.mirror(key, content, bucket=bucket)
async def _seal_blob_file(dest: Path, *, bucket=storage.Bucket.DOCUMENTS) -> None:
"""``_seal_blob`` for a file already on disk (e.g. after shutil.copy)."""
try:
key = dest.resolve().relative_to(Path(config.DATA_DIR).resolve()).as_posix()
except ValueError:
return
await storage.mirror_file(dest, key, bucket=bucket)
@app.get("/api/cases/{case_number}/local-files/{folder}/{filename}")
async def api_read_local_file(case_number: str, folder: str, filename: str):
"""Read contents of a local case file."""
@@ -2925,7 +2953,7 @@ async def api_research_analysis_upload(
tmp = dest.with_suffix(".md.upload-tmp")
try:
dest.parent.mkdir(parents=True, exist_ok=True)
tmp.write_text(text, encoding="utf-8")
tmp.write_text(text, encoding="utf-8") # noqa: STG1 — atomic upload .tmp, replaced below
parsed = research_md.parse(tmp)
except Exception as e:
tmp.unlink(missing_ok=True)
@@ -2959,7 +2987,8 @@ async def api_research_analysis_upload(
backup_dir.mkdir(exist_ok=True)
ts = time.strftime("%Y%m%d-%H%M%S")
backup_path = backup_dir / f"analysis-and-research-{ts}.md"
shutil.copy2(dest, backup_path)
shutil.copy2(dest, backup_path) # noqa: STG1 — sealed below
await _seal_blob_file(backup_path)
# Replace with uploaded file
tmp.replace(dest)
@@ -3069,7 +3098,8 @@ async def api_precedent_upload_pdf(
while dest.exists():
dest = case_dir / f"{safe_name or 'precedent'}-{counter}{ext}"
counter += 1
dest.write_bytes(content)
dest.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(dest, content)
case_id = UUID(case["id"])
doc = await db.create_document(
@@ -3200,7 +3230,8 @@ async def api_upload_export(case_number: str, file: UploadFile = File(...)):
pass
dest = export_dir / f"עריכה-v{next_ver}.docx"
dest.write_bytes(content)
dest.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(dest, content)
# Auto-register as active_draft + retrofit bookmarks
auto_result: dict = {"status": "ok"}
@@ -3355,12 +3386,14 @@ async def api_mark_final(case_number: str, filename: str):
# Rename/copy to final
final_name = f"סופי-{case_number}.docx"
final_path = export_dir / final_name
shutil.copy2(str(source), str(final_path))
shutil.copy2(str(source), str(final_path)) # noqa: STG1 — sealed below
await _seal_blob_file(final_path)
# Also copy to training directory for future style learning
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
training_dest = config.TRAINING_DIR / f"החלטה-{case_number}.docx"
shutil.copy2(str(source), str(training_dest))
shutil.copy2(str(source), str(training_dest)) # noqa: STG1 — sealed below
await _seal_blob_file(training_dest)
# Update case status to final
pool = await db.get_pool()
@@ -3608,13 +3641,15 @@ async def api_upload_final_decision(case_number: str, file: UploadFile = File(..
export_dir.mkdir(parents=True, exist_ok=True)
final_name = f"סופי-{case_number}.docx"
final_path = export_dir / final_name
final_path.write_bytes(content)
final_path.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(final_path, content)
# Enroll in the style corpus. Use the FULL case_number as decision_number so a
# בל"מ never collides with a same-numbered ערר already in the corpus (e.g. ARAR-25-8126).
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
training_dest = config.TRAINING_DIR / f"החלטה-{case_number}.docx"
shutil.copy2(str(final_path), str(training_dest))
shutil.copy2(str(final_path), str(training_dest)) # noqa: STG1 — sealed below
await _seal_blob_file(training_dest)
# Extract the final text (word count for the UI; full text snapshotted into the pair).
final_text = ""
@@ -4950,7 +4985,7 @@ async def api_install_skill(file: UploadFile = File(...)):
dest = skill_dir / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(zf.read(name))
dest.write_bytes(zf.read(name)) # noqa: STG1 — extracts to ~/.paperclip skills (outside DATA_DIR)
extracted_files.append(rel_path)
zf.close()
@@ -5122,7 +5157,7 @@ async def api_restart_paperclip():
# Fallback: write a flag file that host-side watcher picks up
flag_file = PAPERCLIP_SKILLS_DIR / ".restart-requested"
try:
flag_file.write_text(str(time.time()))
flag_file.write_text(str(time.time())) # noqa: STG1 — restart flag (state)
return {
"status": "restart_requested",
"method": "flag_file",
@@ -5175,7 +5210,8 @@ async def api_upload_tagged_document(
dest = case_dir / f"{stem}-{counter}{ext}"
counter += 1
dest.write_bytes(content)
dest.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(dest, content)
# Create document record
case_id = UUID(case["id"])
@@ -5715,7 +5751,8 @@ async def _process_case_document(task_id: str, source: Path, req: ClassifyReques
# Use original name without timestamp prefix
original_name = re.sub(r"^\d+_", "", source.name)
dest = case_dir / original_name
shutil.copy2(str(source), str(dest))
shutil.copy2(str(source), str(dest)) # noqa: STG1 — sealed below
await _seal_blob_file(dest)
# Create document record
await _progress.set(task_id, {"status": "registering", "filename": req.filename})
@@ -5765,7 +5802,8 @@ async def _process_training_document(task_id: str, source: Path, req: ClassifyRe
config.TRAINING_DIR.mkdir(parents=True, exist_ok=True)
original_name = re.sub(r"^\d+_", "", source.name)
dest = config.TRAINING_DIR / original_name
shutil.copy2(str(source), str(dest))
shutil.copy2(str(source), str(dest)) # noqa: STG1 — sealed below
await _seal_blob_file(dest)
# Extract text
await _progress.set(task_id, {"status": "processing", "filename": req.filename, "step": "extracting"})
@@ -6838,7 +6876,8 @@ async def bulletin_upload(file: UploadFile = File(...)):
# Idempotent: same content (any filename) already staged → skip.
if any(p.name.startswith(f"{digest}_") for p in _BULLETINS_DIR.glob(f"{digest}_*")):
return {"status": "exists", "filename": dest.name, "size": len(content)}
dest.write_bytes(content)
dest.write_bytes(content) # noqa: STG1 — sealed below
await _seal_blob(dest, content)
return {"status": "stored", "filename": dest.name, "size": len(content)}