git_sync: full case-dir backup to Gitea (sweep + explicit commits)
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m25s

The case repo is the user's backup, so anything in the dir must end up
on Gitea. Two layers:

1. Periodic sweep (every 30s) — git_sync.sweep_loop runs as a FastAPI
   background task. It scans every case dir, runs git status --porcelain
   on each, and commit_and_push's any dirty changes with an auto-built
   Hebrew message ("אוטו: טיוטות (2) · מסמכים"). Catches files written
   outside the API path: agent research artefacts, manual edits, etc.

2. Explicit commits at known write paths — DOCX export, interim draft,
   apply_user_edit, revise_draft, mark-final, analysis DOCX export.
   These give immediate feedback with descriptive messages instead of
   waiting up to 30s for the sweep.

safe.directory injection added to _git_env so sweep + explicit commits
work even when the running uid differs from the case-dir owner (host
runs vs. uniform-root container).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 18:27:36 +00:00
parent 6a38789379
commit f256eddbb1
3 changed files with 154 additions and 8 deletions

View File

@@ -6,15 +6,23 @@ rotated in Infisical, repos created with the old token will fail to
push silently — only logged at WARNING level. ``commit_and_push``
re-injects the *current* token into the existing origin URL on every
call, so push survives token rotation.
This module also runs a periodic ``sweep_loop`` that catches files
written outside the API path (most importantly: agents writing research
artefacts directly to the case dir). The full case repo is the user's
backup, so anything in the dir must end up on Gitea.
"""
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
from pathlib import Path
from legal_mcp import config
logger = logging.getLogger(__name__)
@@ -22,8 +30,8 @@ def _gitea_token() -> str:
return os.environ.get("GITEA_ACCESS_TOKEN") or os.environ.get("GITEA_TOKEN", "")
def _git_env() -> dict:
return {
def _git_env(case_dir: str | Path | None = None) -> dict:
env = {
"GIT_AUTHOR_NAME": "Ezer Mishpati",
"GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati",
@@ -31,6 +39,13 @@ def _git_env() -> dict:
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
"GIT_TERMINAL_PROMPT": "0",
}
if case_dir is not None:
# Trust the case dir even when the running uid differs from the
# owner (prod container is uniform-root, but host runs may not be).
env["GIT_CONFIG_COUNT"] = "1"
env["GIT_CONFIG_KEY_0"] = "safe.directory"
env["GIT_CONFIG_VALUE_0"] = str(case_dir)
return env
def _refresh_remote_url(case_dir: Path, env: dict) -> bool:
@@ -68,7 +83,7 @@ def commit_and_push(case_dir: str | Path, message: str) -> bool:
if not (case_dir / ".git").exists():
return False
env = _git_env()
env = _git_env(case_dir)
subprocess.run(["git", "add", "."], cwd=case_dir, capture_output=True, env=env)
commit = subprocess.run(
@@ -90,3 +105,104 @@ def commit_and_push(case_dir: str | Path, message: str) -> bool:
logger.warning("Git push failed in %s: %s", case_dir, push.stderr)
return False
return True
# ── Periodic sweep ────────────────────────────────────────────────
#
# The user's expectation is that "anything I or an agent puts into a case
# dir ends up on Gitea". Explicit commit_and_push calls cover the API
# write paths, but agents write research/draft files directly to disk.
# A short periodic sweep is the safety net.
_SWEEP_INTERVAL_SEC = 30
def _porcelain_changes(case_dir: Path, env: dict) -> list[str]:
"""Return list of `git status --porcelain` lines, or [] if clean/error."""
res = subprocess.run(
["git", "status", "--porcelain"],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if res.returncode != 0:
return []
return [ln for ln in res.stdout.splitlines() if ln.strip()]
def _auto_message(changes: list[str]) -> str:
"""Build a Hebrew commit message from porcelain output.
Groups by top-level subdir under the case dir so a sweep that picks up
one DOCX export plus one research file produces a useful summary
instead of "auto-sync".
"""
groups: dict[str, int] = {}
sample: dict[str, str] = {}
for line in changes:
path = line[3:].strip().strip('"')
if "->" in path: # rename
path = path.split("->", 1)[1].strip().strip('"')
first = path.split("/", 1)[0]
groups[first] = groups.get(first, 0) + 1
sample.setdefault(first, path)
label_map = {
"documents": "מסמכים",
"drafts": "טיוטות",
"exports": "גרסאות",
"case.json": "מטא",
"notes.md": "הערות",
}
parts: list[str] = []
for top, count in groups.items():
label = label_map.get(top, top)
parts.append(f"{label} ({count})" if count > 1 else label)
summary = " · ".join(parts) or "שינויים"
return f"אוטו: {summary}"
def sweep_once() -> dict:
"""Walk every case dir and commit+push any dirty changes.
Synchronous (subprocess-based) but cheap — `git status --porcelain` on
a clean dir is a sub-millisecond operation. Returns a small report
suitable for logging.
"""
base: Path = config.CASES_DIR
if not base.exists():
return {"checked": 0, "synced": 0, "errors": 0}
checked = synced = errors = 0
for case_dir in base.iterdir():
if not case_dir.is_dir() or not (case_dir / ".git").exists():
continue
checked += 1
changes = _porcelain_changes(case_dir, _git_env(case_dir))
if not changes:
continue
msg = _auto_message(changes)
ok = commit_and_push(case_dir, msg)
if ok:
synced += 1
logger.info("auto-sync committed %d change(s) in %s", len(changes), case_dir.name)
else:
errors += 1
return {"checked": checked, "synced": synced, "errors": errors}
async def sweep_loop(interval_sec: int = _SWEEP_INTERVAL_SEC) -> None:
"""Background task: run sweep_once forever every interval_sec.
Cancellation-safe; logs and continues on transient errors.
"""
logger.info("git_sync.sweep_loop started (interval=%ds)", interval_sec)
while True:
try:
await asyncio.sleep(interval_sec)
# Run the sync subprocess work in a thread to avoid blocking
# the FastAPI event loop.
await asyncio.to_thread(sweep_once)
except asyncio.CancelledError:
logger.info("git_sync.sweep_loop cancelled")
raise
except Exception as exc:
logger.warning("git_sync sweep iteration failed: %s", exc)

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from uuid import UUID
from legal_mcp import config
from legal_mcp.services import db, embeddings, research_md
from legal_mcp.services import db, embeddings, git_sync, research_md
from legal_mcp.services.lessons import (
CITATION_GUIDANCE,
DECISION_TEMPLATES,
@@ -403,6 +403,9 @@ async def export_docx(case_number: str, output_path: str = "") -> str:
path = await docx_exporter.export_decision(case_id, output_path or None)
# Register this export as the new source of truth
await db.set_active_draft_path(case_id, path)
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
git_sync.commit_and_push(case_dir, f"ייצוא DOCX: {Path(path).name}")
return json.dumps({
"status": "completed",
"path": path,
@@ -528,6 +531,9 @@ async def export_interim_draft(case_number: str, output_path: str = "") -> str:
case_id, output_path or None, mode="interim",
)
await db.set_active_draft_path(case_id, path)
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
git_sync.commit_and_push(case_dir, f"טיוטת ביניים: {Path(path).name}")
return json.dumps({
"status": "completed",
"mode": "interim",
@@ -571,6 +577,9 @@ async def apply_user_edit(case_number: str, edit_filename: str) -> str:
try:
retrofit_result = docx_retrofit.retrofit_bookmarks(edit_path)
await db.set_active_draft_path(case_id, str(edit_path))
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
git_sync.commit_and_push(case_dir, f"גרסת עריכה: {edit_path.name}")
return json.dumps({
"status": "completed",
"active_draft_path": str(edit_path),
@@ -681,6 +690,12 @@ async def revise_draft(case_number: str, revisions_json: str,
active_path, output_path, revisions, author=author,
)
await db.set_active_draft_path(case_id, str(output_path))
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
git_sync.commit_and_push(
case_dir,
f"revise: טיוטה-v{next_ver} ({result.applied} שינויים, {result.failed} נכשלו)",
)
return json.dumps({
"status": "completed",
"output_path": str(output_path),

View File

@@ -28,7 +28,7 @@ from pydantic import BaseModel
import asyncpg
from legal_mcp import config
from legal_mcp.services import chunker, db, embeddings, extractor, processor, proofreader, research_md
from legal_mcp.services import chunker, db, embeddings, extractor, git_sync, processor, proofreader, research_md
from legal_mcp.tools import cases as cases_tools, search as search_tools, workflow as workflow_tools, drafting as drafting_tools, precedents as precedents_tools
# Import integration clients (same directory)
@@ -69,7 +69,15 @@ _progress = ProgressStore(config.REDIS_URL, ttl_seconds=PROGRESS_TTL_SECONDS)
async def lifespan(app: FastAPI):
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
await db.init_schema()
sync_task = asyncio.create_task(git_sync.sweep_loop())
try:
yield
finally:
sync_task.cancel()
try:
await sync_task
except asyncio.CancelledError:
pass
await db.close_pool()
await _progress.close()
@@ -1832,6 +1840,9 @@ async def api_research_analysis_export_docx(case_number: str):
except Exception as e:
logger.exception("Failed to export analysis DOCX for %s", case_number)
raise HTTPException(500, f"שגיאה בייצוא: {e}")
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
commit_and_push(case_dir, f"ניתוח משפטי: {path.name}")
return FileResponse(
path,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
@@ -2260,6 +2271,10 @@ async def api_mark_final(case_number: str, filename: str):
UUID(case["id"]),
)
case_dir = config.find_case_dir(case_number)
if case_dir.exists():
commit_and_push(case_dir, f"גרסה סופית: {final_name}")
return {
"final_filename": final_name,
"training_copy": str(training_dest),