Files
legal-ai/web/gitea_client.py

99 lines
3.1 KiB
Python

"""Gitea REST API client — create repos in the 'cases' org and push."""
from __future__ import annotations
import logging
import os
import subprocess
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
GITEA_ORG = "cases"
def _host() -> str:
return os.environ.get("GITEA_HOST", "https://gitea.nautilus.marcusgroup.org")
def _token() -> str:
return os.environ.get("GITEA_ACCESS_TOKEN") or os.environ.get("GITEA_TOKEN", "")
async def create_repo(case_number: str, title: str, description: str = "") -> dict:
"""Create a private repo in the 'cases' org on Gitea."""
repo_name = case_number # e.g. "1130-25"
async with httpx.AsyncClient(verify=False, timeout=30) as client:
resp = await client.post(
f"{_host()}/api/v1/orgs/{GITEA_ORG}/repos",
headers={"Authorization": f"token {_token()}"},
json={
"name": repo_name,
"description": f"ערר {case_number}{title}"[:255],
"private": True,
"auto_init": False,
},
)
if resp.status_code == 409:
# Repo already exists — fetch it
resp2 = await client.get(
f"{_host()}/api/v1/repos/{GITEA_ORG}/{repo_name}",
headers={"Authorization": f"token {_token()}"},
)
resp2.raise_for_status()
return resp2.json()
resp.raise_for_status()
return resp.json()
def setup_remote_and_push(case_dir: str | Path, repo_clone_url: str) -> bool:
"""Add Gitea as remote 'origin' (or update it) and push."""
case_dir = Path(case_dir)
if not (case_dir / ".git").exists():
return False
env = {
"GIT_AUTHOR_NAME": "Ezer Mishpati",
"GIT_AUTHOR_EMAIL": "legal@local",
"GIT_COMMITTER_NAME": "Ezer Mishpati",
"GIT_COMMITTER_EMAIL": "legal@local",
"PATH": os.environ.get("PATH", "/usr/bin:/bin"),
}
# Inject token into clone URL for auth
auth_url = repo_clone_url.replace("https://", f"https://chaim:{_token()}@")
# Check if remote exists
result = subprocess.run(
["git", "remote", "get-url", "origin"],
cwd=case_dir, capture_output=True, text=True,
)
if result.returncode == 0:
subprocess.run(
["git", "remote", "set-url", "origin", auth_url],
cwd=case_dir, capture_output=True, env=env,
)
else:
subprocess.run(
["git", "remote", "add", "origin", auth_url],
cwd=case_dir, capture_output=True, env=env,
)
# Push
push_result = subprocess.run(
["git", "push", "-u", "origin", "main"],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if push_result.returncode != 0:
# Try master branch
push_result = subprocess.run(
["git", "push", "-u", "origin", "master"],
cwd=case_dir, capture_output=True, text=True, env=env,
)
if push_result.returncode != 0:
logger.warning("Git push failed: %s", push_result.stderr)
return False
return True