"""Periodic corpus-integrity audit. Runs a set of read-only SQL checks against the legal-ai DB to detect rows that violate domain constraints which are *not* enforced by the schema (or were added after the constraint was put in place). Checks performed: A. ``case_law`` rows with ``source_kind='external_upload'`` whose ``case_number`` starts with the Hebrew prefixes ``ערר`` / ``בל"מ``. Internal committee decisions belong to ``source_kind='internal_committee'``. B. ``case_law`` rows with ``source_kind='internal_committee'`` that lack a ``chair_name`` and/or ``district``. Internal decisions must carry both. C. ``cases`` rows with a ``practice_area`` outside the closed set {``rishuy_uvniya``, ``betterment_levy``, ``compensation_197``, ``''``}. Output: * Appends a timestamped block to ``data/logs/corpus_integrity_audit.log``. * If hits are found AND env ``PAPERCLIP_API_URL`` + ``PAPERCLIP_API_KEY`` are set, posts a CEO wakeup comment via ``POST /api/agents/{ceo}/wakeup`` (best-effort, never fails the script). * Always exits 0 unless an unexpected error occurs (so cron stays quiet). Cron suggestion (daily 07:00): 0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python \\ /home/chaim/legal-ai/scripts/audit_corpus_integrity.py Idempotent. Read-only on the DB. """ from __future__ import annotations import argparse import asyncio import logging import os import sys from datetime import datetime, timezone from pathlib import Path # Load ~/.env so POSTGRES_* / PAPERCLIP_* are picked up when run from cron. ENV_PATH = os.path.expanduser("~/.env") if os.path.isfile(ENV_PATH): with open(ENV_PATH, encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) os.environ.setdefault(k, v) import asyncpg # noqa: E402 try: import httpx # noqa: E402 except ImportError: # httpx is part of the legal-ai venv; not required for DB checks httpx = None # type: ignore[assignment] REPO_ROOT = Path(__file__).resolve().parent.parent LOG_PATH = REPO_ROOT / "data" / "logs" / "corpus_integrity_audit.log" CHECK_A_SQL = ( "SELECT id, case_number FROM case_law " "WHERE source_kind = 'external_upload' AND case_number ~ '^ערר|^בל\"מ' " "ORDER BY case_number" ) CHECK_B_SQL = ( "SELECT id, case_number, chair_name, district FROM case_law " "WHERE source_kind = 'internal_committee' " "AND (chair_name IS NULL OR chair_name = '' " " OR district IS NULL OR district = '') " "ORDER BY case_number" ) CHECK_C_SQL = ( "SELECT id, case_number, practice_area FROM cases " "WHERE practice_area IS NOT NULL " "AND practice_area NOT IN ('rishuy_uvniya', 'betterment_levy', " " 'compensation_197', '') " "ORDER BY case_number" ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("audit_corpus_integrity") def _pg_url() -> str: """Resolve POSTGRES URL from env, falling back to discrete vars.""" url = os.environ.get("POSTGRES_URL") if url: return url pg_host = os.environ.get("POSTGRES_HOST", "127.0.0.1") pg_port = int(os.environ.get("POSTGRES_PORT", "5433")) pg_user = os.environ.get("POSTGRES_USER", "legal_ai") pg_pw = os.environ.get("POSTGRES_PASSWORD", "") pg_db = os.environ.get("POSTGRES_DB", "legal_ai") if not pg_pw: raise SystemExit("POSTGRES_PASSWORD / POSTGRES_URL not set") return f"postgres://{pg_user}:{pg_pw}@{pg_host}:{pg_port}/{pg_db}" async def _run_check(conn: asyncpg.Connection, sql: str) -> list[dict]: rows = await conn.fetch(sql) return [dict(r) for r in rows] async def _resolve_ceo_agent_id() -> str | None: """Best-effort: look up the CEO agent UUID for CMP via the API. Returns None if PAPERCLIP env is missing or the lookup fails. """ base_url = os.environ.get("PAPERCLIP_API_URL") api_key = os.environ.get("PAPERCLIP_API_KEY") if not (base_url and api_key and httpx is not None): return None try: async with httpx.AsyncClient(timeout=5.0) as client: r = await client.get( f"{base_url}/api/agents", headers={"Authorization": f"Bearer {api_key}"}, ) r.raise_for_status() payload = r.json() items = payload if isinstance(payload, list) else payload.get("items", []) for item in items: # Look for a CMP-side CEO (master); the CMPA mirror has a different id. title = (item.get("title") or "").lower() role = (item.get("role") or "").lower() if "ceo" in title or "ceo" in role or "מנכ" in title: return item.get("id") except Exception as e: logger.warning("CEO lookup failed: %s", e) return None async def _notify_ceo(summary: str) -> bool: """Post a wakeup comment to the CEO agent. Returns True on best-effort success.""" base_url = os.environ.get("PAPERCLIP_API_URL") api_key = os.environ.get("PAPERCLIP_API_KEY") if not (base_url and api_key and httpx is not None): logger.info("Paperclip env not set — skipping CEO wakeup") return False ceo_id = await _resolve_ceo_agent_id() if not ceo_id: logger.info("Could not resolve CEO agent id — skipping wakeup") return False try: async with httpx.AsyncClient(timeout=5.0) as client: r = await client.post( f"{base_url}/api/agents/{ceo_id}/wakeup", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json={ "source": "automation", "triggerDetail": "audit_corpus_integrity", "reason": "corpus integrity audit found violations", "payload": {"summary": summary}, }, ) r.raise_for_status() logger.info("Notified CEO (agent_id=%s)", ceo_id) return True except Exception as e: logger.warning("CEO wakeup failed: %s", e) return False def _format_report( a_hits: list[dict], b_hits: list[dict], c_hits: list[dict], ts: datetime, ) -> str: parts: list[str] = [] parts.append(f"=== Corpus integrity audit @ {ts.isoformat()} ===") parts.append("") parts.append( f"Check A (case_law external_upload with internal-style " f"case_number prefix): {len(a_hits)} hit(s)" ) for row in a_hits[:50]: parts.append(f" - id={row['id']} case_number={row['case_number']!r}") if len(a_hits) > 50: parts.append(f" ... ({len(a_hits) - 50} more truncated)") parts.append("") parts.append( f"Check B (case_law internal_committee missing chair_name/district): " f"{len(b_hits)} hit(s)" ) for row in b_hits[:50]: parts.append( f" - id={row['id']} case_number={row['case_number']!r} " f"chair_name={row.get('chair_name')!r} district={row.get('district')!r}" ) if len(b_hits) > 50: parts.append(f" ... ({len(b_hits) - 50} more truncated)") parts.append("") parts.append( f"Check C (cases.practice_area outside closed set): {len(c_hits)} hit(s)" ) for row in c_hits[:50]: parts.append( f" - id={row['id']} case_number={row['case_number']!r} " f"practice_area={row.get('practice_area')!r}" ) if len(c_hits) > 50: parts.append(f" ... ({len(c_hits) - 50} more truncated)") parts.append("") return "\n".join(parts) async def main(args: argparse.Namespace) -> int: pg_url = _pg_url() conn = await asyncpg.connect(pg_url) try: a_hits = await _run_check(conn, CHECK_A_SQL) b_hits = await _run_check(conn, CHECK_B_SQL) c_hits = await _run_check(conn, CHECK_C_SQL) finally: await conn.close() total = len(a_hits) + len(b_hits) + len(c_hits) ts = datetime.now(timezone.utc) report = _format_report(a_hits, b_hits, c_hits, ts) # Always write to log (creates dir + file if missing). LOG_PATH.parent.mkdir(parents=True, exist_ok=True) with LOG_PATH.open("a", encoding="utf-8") as f: f.write(report) f.write("\n") # Echo to stdout so cron mail / manual run shows the result. print(report) if total == 0: logger.info("clean: no integrity violations found") return 0 logger.warning( "found %d total violation(s) (A=%d, B=%d, C=%d)", total, len(a_hits), len(b_hits), len(c_hits), ) if args.notify: summary_lines = [ "ה-audit היומי על הקורפוס מצא הפרות:", f"- Check A (external_upload עם prefix פנימי): {len(a_hits)}", f"- Check B (internal_committee חסר chair/district): {len(b_hits)}", f"- Check C (cases.practice_area לא תקין): {len(c_hits)}", "", f"פירוט מלא: {LOG_PATH}", ] await _notify_ceo("\n".join(summary_lines)) return 0 if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--no-notify", dest="notify", action="store_false", help="Don't post a CEO wakeup even if hits are found", ) parser.set_defaults(notify=True) args = parser.parse_args() try: rc = asyncio.run(main(args)) except KeyboardInterrupt: sys.exit(130) sys.exit(rc)