Files
legal-ai/mcp-server/src/legal_mcp/services/git_sync.py
Chaim f256eddbb1
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m25s
git_sync: full case-dir backup to Gitea (sweep + explicit commits)
The case repo is the user's backup, so anything in the dir must end up
on Gitea. Two layers:

1. Periodic sweep (every 30s) — git_sync.sweep_loop runs as a FastAPI
   background task. It scans every case dir, runs git status --porcelain
   on each, and commit_and_push's any dirty changes with an auto-built
   Hebrew message ("אוטו: טיוטות (2) · מסמכים"). Catches files written
   outside the API path: agent research artefacts, manual edits, etc.

2. Explicit commits at known write paths — DOCX export, interim draft,
   apply_user_edit, revise_draft, mark-final, analysis DOCX export.
   These give immediate feedback with descriptive messages instead of
   waiting up to 30s for the sweep.

safe.directory injection added to _git_env so sweep + explicit commits
work even when the running uid differs from the case-dir owner (host
runs vs. uniform-root container).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 18:27:36 +00:00

209 lines
7.3 KiB
Python

"""Git sync helpers for case repos.
Each case lives in its own git repo with a Gitea remote. The remote URL
embeds an auth token (https://chaim:TOKEN@host/...). When the token is
rotated in Infisical, repos created with the old token will fail to
push silently — only logged at WARNING level. ``commit_and_push``
re-injects the *current* token into the existing origin URL on every
call, so push survives token rotation.
This module also runs a periodic ``sweep_loop`` that catches files
written outside the API path (most importantly: agents writing research
artefacts directly to the case dir). The full case repo is the user's
backup, so anything in the dir must end up on Gitea.
"""
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
from pathlib import Path
from legal_mcp import config
logger = logging.getLogger(__name__)
def _gitea_token() -> str:
return os.environ.get("GITEA_ACCESS_TOKEN") or os.environ.get("GITEA_TOKEN", "")
def _git_env(case_dir: str | Path | None = None) -> dict:
env = {
"GIT_AUTHOR_NAME": "Ezer Mishpati",
"GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati",
"GIT_COMMITTER_EMAIL": "legal@local",
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
"GIT_TERMINAL_PROMPT": "0",
}
if case_dir is not None:
# Trust the case dir even when the running uid differs from the
# owner (prod container is uniform-root, but host runs may not be).
env["GIT_CONFIG_COUNT"] = "1"
env["GIT_CONFIG_KEY_0"] = "safe.directory"
env["GIT_CONFIG_VALUE_0"] = str(case_dir)
return env
def _refresh_remote_url(case_dir: Path, env: dict) -> bool:
result = subprocess.run(
["git", "remote", "get-url", "origin"],
cwd=case_dir, capture_output=True, text=True,
)
if result.returncode != 0:
return False
current_url = result.stdout.strip()
if "@" in current_url and current_url.startswith("https://"):
bare_url = "https://" + current_url.split("@", 1)[1]
else:
bare_url = current_url
token = _gitea_token()
if not token:
return True # Push without auth — will fail, but caller decides what to do
auth_url = bare_url.replace("https://", f"https://chaim:{token}@")
if auth_url != current_url:
subprocess.run(
["git", "remote", "set-url", "origin", auth_url],
cwd=case_dir, capture_output=True, env=env,
)
return True
def commit_and_push(case_dir: str | Path, message: str) -> bool:
"""Stage, commit, refresh origin URL with current token, and push.
Best-effort: on failure logs at WARNING and returns False, but never
raises. Continues to push even if the commit was a no-op (in case
earlier commits are unpushed).
"""
case_dir = Path(case_dir)
if not (case_dir / ".git").exists():
return False
env = _git_env(case_dir)
subprocess.run(["git", "add", "."], cwd=case_dir, capture_output=True, env=env)
commit = subprocess.run(
["git", "commit", "-m", message],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if commit.returncode != 0 and "nothing to commit" not in commit.stdout:
logger.warning("Git commit failed in %s: %s", case_dir, commit.stderr or commit.stdout)
if not _refresh_remote_url(case_dir, env):
logger.warning("No origin remote configured in %s — skipping push", case_dir)
return False
push = subprocess.run(
["git", "push"],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if push.returncode != 0:
logger.warning("Git push failed in %s: %s", case_dir, push.stderr)
return False
return True
# ── Periodic sweep ────────────────────────────────────────────────
#
# The user's expectation is that "anything I or an agent puts into a case
# dir ends up on Gitea". Explicit commit_and_push calls cover the API
# write paths, but agents write research/draft files directly to disk.
# A short periodic sweep is the safety net.
_SWEEP_INTERVAL_SEC = 30
def _porcelain_changes(case_dir: Path, env: dict) -> list[str]:
"""Return list of `git status --porcelain` lines, or [] if clean/error."""
res = subprocess.run(
["git", "status", "--porcelain"],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if res.returncode != 0:
return []
return [ln for ln in res.stdout.splitlines() if ln.strip()]
def _auto_message(changes: list[str]) -> str:
"""Build a Hebrew commit message from porcelain output.
Groups by top-level subdir under the case dir so a sweep that picks up
one DOCX export plus one research file produces a useful summary
instead of "auto-sync".
"""
groups: dict[str, int] = {}
sample: dict[str, str] = {}
for line in changes:
path = line[3:].strip().strip('"')
if "->" in path: # rename
path = path.split("->", 1)[1].strip().strip('"')
first = path.split("/", 1)[0]
groups[first] = groups.get(first, 0) + 1
sample.setdefault(first, path)
label_map = {
"documents": "מסמכים",
"drafts": "טיוטות",
"exports": "גרסאות",
"case.json": "מטא",
"notes.md": "הערות",
}
parts: list[str] = []
for top, count in groups.items():
label = label_map.get(top, top)
parts.append(f"{label} ({count})" if count > 1 else label)
summary = " · ".join(parts) or "שינויים"
return f"אוטו: {summary}"
def sweep_once() -> dict:
"""Walk every case dir and commit+push any dirty changes.
Synchronous (subprocess-based) but cheap — `git status --porcelain` on
a clean dir is a sub-millisecond operation. Returns a small report
suitable for logging.
"""
base: Path = config.CASES_DIR
if not base.exists():
return {"checked": 0, "synced": 0, "errors": 0}
checked = synced = errors = 0
for case_dir in base.iterdir():
if not case_dir.is_dir() or not (case_dir / ".git").exists():
continue
checked += 1
changes = _porcelain_changes(case_dir, _git_env(case_dir))
if not changes:
continue
msg = _auto_message(changes)
ok = commit_and_push(case_dir, msg)
if ok:
synced += 1
logger.info("auto-sync committed %d change(s) in %s", len(changes), case_dir.name)
else:
errors += 1
return {"checked": checked, "synced": synced, "errors": errors}
async def sweep_loop(interval_sec: int = _SWEEP_INTERVAL_SEC) -> None:
"""Background task: run sweep_once forever every interval_sec.
Cancellation-safe; logs and continues on transient errors.
"""
logger.info("git_sync.sweep_loop started (interval=%ds)", interval_sec)
while True:
try:
await asyncio.sleep(interval_sec)
# Run the sync subprocess work in a thread to avoid blocking
# the FastAPI event loop.
await asyncio.to_thread(sweep_once)
except asyncio.CancelledError:
logger.info("git_sync.sweep_loop cancelled")
raise
except Exception as exc:
logger.warning("git_sync sweep iteration failed: %s", exc)