"""Configuration loaded from Infisical or central .env file. Priority: Infisical → environment variables → .env file """ import os from pathlib import Path from dotenv import load_dotenv # Load from central .env or override path dotenv_path = os.environ.get("DOTENV_PATH", str(Path.home() / ".env")) load_dotenv(dotenv_path) # Try loading from Infisical if configured INFISICAL_TOKEN = os.environ.get("INFISICAL_TOKEN", "") if INFISICAL_TOKEN: try: from infisical_sdk import InfisicalSDKClient _client = InfisicalSDKClient(token=INFISICAL_TOKEN) _secrets = _client.get_all_secrets( environment=os.environ.get("INFISICAL_ENV", "production"), project_id=os.environ.get("INFISICAL_PROJECT_ID", ""), ) for s in _secrets: os.environ.setdefault(s.secret_key, s.secret_value) except ImportError: pass # Infisical SDK not installed — use .env except Exception: pass # Infisical unreachable — fall back to .env # PostgreSQL POSTGRES_URL = os.environ.get( "POSTGRES_URL", f"postgres://{os.environ.get('POSTGRES_USER', 'legal_ai')}:" f"{os.environ.get('POSTGRES_PASSWORD', '')}@" f"{os.environ.get('POSTGRES_HOST', '127.0.0.1')}:" f"{os.environ.get('POSTGRES_PORT', '5433')}/" f"{os.environ.get('POSTGRES_DB', 'legal_ai')}", ) # Redis REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6380/0") # Voyage AI VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY", "") VOYAGE_MODEL = os.environ.get("VOYAGE_MODEL", "voyage-law-2") VOYAGE_DIMENSIONS = 1024 # Google Cloud Vision (OCR for scanned PDFs) GOOGLE_CLOUD_VISION_API_KEY = os.environ.get("GOOGLE_CLOUD_VISION_API_KEY", "") # Data directory DATA_DIR = Path(os.environ.get("DATA_DIR", str(Path.home() / "legal-ai" / "data"))) TRAINING_DIR = DATA_DIR / "training" EXPORTS_DIR = DATA_DIR / "exports" # legacy exports only # Cases directory — flat structure: data/cases/{case_number}/ CASES_DIR = DATA_DIR / "cases" def find_case_dir(case_number: str) -> Path: """Return the case directory for a given case number.""" return CASES_DIR / case_number # Chunking parameters CHUNK_SIZE_TOKENS = 600 CHUNK_OVERLAP_TOKENS = 100 # External service allowlist — case materials may ONLY be sent to these domains ALLOWED_EXTERNAL_SERVICES = { "api.voyageai.com", # Voyage AI (embeddings) "vision.googleapis.com", # Google Cloud Vision (OCR) } # Audit AUDIT_ENABLED = os.environ.get("AUDIT_ENABLED", "true").lower() == "true" # ── Utility ─────────────────────────────────────────────────────── def parse_llm_json(raw: str): """Parse JSON from LLM response, handling markdown wrapping and truncation. Handles: 1. Markdown ```json ... ``` code blocks 2. Extra text before/after JSON 3. Truncated JSON (missing closing brackets) — attempts recovery """ import json import re raw = raw.strip() # Strip markdown code blocks raw = re.sub(r"^```(?:json)?\s*\n?", "", raw) raw = re.sub(r"\n?\s*```\s*$", "", raw) # Try direct parse first try: return json.loads(raw) except json.JSONDecodeError: pass # Try to find JSON object or array for pattern in [r"\{.*\}", r"\[.*\]"]: match = re.search(pattern, raw, re.DOTALL) if match: try: return json.loads(match.group()) except json.JSONDecodeError: continue # Attempt truncated JSON recovery: # Find the start of JSON, then try closing open brackets for opener, closer in [("[", "]"), ("{", "}")]: start = raw.find(opener) if start < 0: continue fragment = raw[start:] # Try progressively removing trailing partial content and closing # Look for the last complete item (ending with }, or ]) for end_pattern in [r'.*\}(?=\s*,?\s*$)', r'.*\](?=\s*,?\s*$)', r'.*"(?=\s*$)']: pass # fallback below # Simple approach: find last complete JSON item boundary # For arrays: find last "}" and close the array if opener == "[": last_brace = fragment.rfind("}") if last_brace > 0: truncated = fragment[:last_brace + 1] + "]" try: return json.loads(truncated) except json.JSONDecodeError: pass # For objects: find last complete key-value if opener == "{": last_brace = fragment.rfind("}") if last_brace > 0: # Check if this closes a nested object — try adding outer close truncated = fragment[:last_brace + 1] # Count unclosed braces open_count = truncated.count("{") - truncated.count("}") truncated += "}" * open_count try: return json.loads(truncated) except json.JSONDecodeError: pass return None