Files
legal-ai/scripts/audit_corpus_integrity.py
Chaim 242e6cfd11
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
fix(learning): chair_name במקור — סופי-ועדה תמיד נכנס לקורפוס-הפסיקה (TaskMaster #134)
הבאג: שלב-הלמידה (ingest_final_version → ingest_internal_decision) מוסיף כל
סופי כתקדים ציטוטי ב-case_law (source_kind=internal_committee), אך נכשל
בשקט (non-fatal warning) כש-cases.chair_name ריק — בגלל constraint
case_law_internal_chair_check. כך סופיים של 1194/1200/8070 לא נכנסו
לקורפוס-הפסיקה. שורש: (1) chair_name לא נקבע בפתיחת תיק; (2) מסלול-ה-MCP
העביר chair גולמי בעוד מסלול-ה-UI (web/) כבר פתר אותו דטרמיניסטית —
**מסלולים מקבילים מתפצלים (הפרת INV-G2)**; (3) הכשל נבלע (נגד §6).

תיקון-שורש (3 שכבות):
1. **SoT יחיד (INV-G2):** `config.committee_chair_for_case` — המקום היחיד
   שגם web/app.py וגם tools/workflow.py + db.create_case גוזרים ממנו chair
   (לפי תחילית מספר-התיק; override ל-env). web/ אחוד אליו (הוסרה הכפילות).
2. **נרמול-במקור (INV-G1):** `db.create_case` קובע chair_name תמיד לא-ריק;
   `cases.case_create` חושף param. `ingest_final_version` גוזר chair מה-SoT
   במקום הערך הגולמי → ה-constraint לא נופל.
3. **נראות (§6/feedback_silent_swallow):** כשל-העתק מוחזר ב-result
   (`internal_corpus_error`) ו-`final_learning_pipeline` מדפיס אזהרה — לא
   נבלע. backfill ל-11 תיקים עם chair ריק. `audit_corpus_integrity`:
   נוספו CHECK_D (תיקים מוכרעים ללא chair) + CHECK_E (סופי-final חסר
   מקורפוס-הפסיקה) — שניהם 0 כעת.

invariants: מקיים INV-G1 (נרמול בכתיבה), INV-G2 (מסלול-יחיד, אוחד web↔MCP),
§6 (אין בליעה שקטה). בדיקות: py_compile + 14 pytest (chair_seed_gate,
audit_provenance) + integration של create_case (default+override) + הרצת
ה-audit החי (A–E=0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 07:25:54 +00:00

333 lines
12 KiB
Python

"""Periodic corpus-integrity audit.
Runs a set of read-only SQL checks against the legal-ai DB to detect rows
that violate domain constraints which are *not* enforced by the schema
(or were added after the constraint was put in place).
Checks performed:
A. ``case_law`` rows with ``source_kind='external_upload'`` whose
``case_number`` starts with the Hebrew prefixes ``ערר`` / ``בל"מ``.
Internal committee decisions belong to ``source_kind='internal_committee'``.
B. ``case_law`` rows with ``source_kind='internal_committee'`` that
lack a ``chair_name`` and/or ``district``. Internal decisions must
carry both.
C. ``cases`` rows with a ``practice_area`` outside the closed set
{``rishuy_uvniya``, ``betterment_levy``, ``compensation_197``, ``''``}.
Output:
* Appends a timestamped block to ``data/logs/corpus_integrity_audit.log``.
* If hits are found AND env ``PAPERCLIP_API_URL`` + ``PAPERCLIP_API_KEY``
are set, posts a CEO wakeup comment via ``POST /api/agents/{ceo}/wakeup``
(best-effort, never fails the script).
* Always exits 0 unless an unexpected error occurs (so cron stays quiet).
Cron suggestion (daily 07:00):
0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python \\
/home/chaim/legal-ai/scripts/audit_corpus_integrity.py
Idempotent. Read-only on the DB.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
# Load ~/.env so POSTGRES_* / PAPERCLIP_* are picked up when run from cron.
ENV_PATH = os.path.expanduser("~/.env")
if os.path.isfile(ENV_PATH):
with open(ENV_PATH, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
import asyncpg # noqa: E402
try:
import httpx # noqa: E402
except ImportError: # httpx is part of the legal-ai venv; not required for DB checks
httpx = None # type: ignore[assignment]
REPO_ROOT = Path(__file__).resolve().parent.parent
LOG_PATH = REPO_ROOT / "data" / "logs" / "corpus_integrity_audit.log"
CHECK_A_SQL = (
"SELECT id, case_number FROM case_law "
"WHERE source_kind = 'external_upload' AND case_number ~ '^ערר|^בל\"מ' "
"ORDER BY case_number"
)
CHECK_B_SQL = (
"SELECT id, case_number, chair_name, district FROM case_law "
"WHERE source_kind = 'internal_committee' "
"AND (chair_name IS NULL OR chair_name = '' "
" OR district IS NULL OR district = '') "
"ORDER BY case_number"
)
CHECK_C_SQL = (
"SELECT id, case_number, practice_area FROM cases "
"WHERE practice_area IS NOT NULL "
"AND practice_area NOT IN ('rishuy_uvniya', 'betterment_levy', "
" 'compensation_197', '') "
"ORDER BY case_number"
)
# D. cases that reached a decided state but have no chair_name. An empty chair
# silently breaks the internal_committee corpus copy of the final
# (case_law_internal_chair_check) — chair must be set at source (INV-G1).
CHECK_D_SQL = (
"SELECT id, case_number, status FROM cases "
"WHERE status IN ('final', 'exported', 'reviewed') "
"AND (chair_name IS NULL OR chair_name = '') "
"ORDER BY case_number"
)
# E. SIGNED finals that never landed in the citable precedent corpus
# (case_law, source_kind='internal_committee'). Only status='final' means the
# chair's signed decision was ingested — 'exported' is merely OUR draft DOCX
# and legitimately has no precedent copy. This is the exact failure the
# chair_name fix prevents going forward; the check catches any regression.
CHECK_E_SQL = (
"SELECT c.id, c.case_number, c.status FROM cases c "
"WHERE c.status = 'final' "
"AND NOT EXISTS (SELECT 1 FROM case_law cl "
" WHERE cl.case_number = c.case_number "
" AND cl.source_kind = 'internal_committee') "
"ORDER BY c.case_number"
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("audit_corpus_integrity")
def _pg_url() -> str:
"""Resolve POSTGRES URL from env, falling back to discrete vars."""
url = os.environ.get("POSTGRES_URL")
if url:
return url
pg_host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
pg_port = int(os.environ.get("POSTGRES_PORT", "5433"))
pg_user = os.environ.get("POSTGRES_USER", "legal_ai")
pg_pw = os.environ.get("POSTGRES_PASSWORD", "")
pg_db = os.environ.get("POSTGRES_DB", "legal_ai")
if not pg_pw:
raise SystemExit("POSTGRES_PASSWORD / POSTGRES_URL not set")
return f"postgres://{pg_user}:{pg_pw}@{pg_host}:{pg_port}/{pg_db}"
async def _run_check(conn: asyncpg.Connection, sql: str) -> list[dict]:
rows = await conn.fetch(sql)
return [dict(r) for r in rows]
async def _resolve_ceo_agent_id() -> str | None:
"""Best-effort: look up the CEO agent UUID for CMP via the API.
Returns None if PAPERCLIP env is missing or the lookup fails.
"""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
return None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
f"{base_url}/api/agents",
headers={"Authorization": f"Bearer {api_key}"},
)
r.raise_for_status()
payload = r.json()
items = payload if isinstance(payload, list) else payload.get("items", [])
for item in items:
# Look for a CMP-side CEO (master); the CMPA mirror has a different id.
title = (item.get("title") or "").lower()
role = (item.get("role") or "").lower()
if "ceo" in title or "ceo" in role or "מנכ" in title:
return item.get("id")
except Exception as e:
logger.warning("CEO lookup failed: %s", e)
return None
async def _notify_ceo(summary: str) -> bool:
"""Post a wakeup comment to the CEO agent. Returns True on best-effort success."""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
logger.info("Paperclip env not set — skipping CEO wakeup")
return False
ceo_id = await _resolve_ceo_agent_id()
if not ceo_id:
logger.info("Could not resolve CEO agent id — skipping wakeup")
return False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{base_url}/api/agents/{ceo_id}/wakeup",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"source": "automation",
"triggerDetail": "audit_corpus_integrity",
"reason": "corpus integrity audit found violations",
"payload": {"summary": summary},
},
)
r.raise_for_status()
logger.info("Notified CEO (agent_id=%s)", ceo_id)
return True
except Exception as e:
logger.warning("CEO wakeup failed: %s", e)
return False
def _format_report(
a_hits: list[dict],
b_hits: list[dict],
c_hits: list[dict],
d_hits: list[dict],
e_hits: list[dict],
ts: datetime,
) -> str:
parts: list[str] = []
parts.append(f"=== Corpus integrity audit @ {ts.isoformat()} ===")
parts.append("")
parts.append(
f"Check A (case_law external_upload with internal-style "
f"case_number prefix): {len(a_hits)} hit(s)"
)
for row in a_hits[:50]:
parts.append(f" - id={row['id']} case_number={row['case_number']!r}")
if len(a_hits) > 50:
parts.append(f" ... ({len(a_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check B (case_law internal_committee missing chair_name/district): "
f"{len(b_hits)} hit(s)"
)
for row in b_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"chair_name={row.get('chair_name')!r} district={row.get('district')!r}"
)
if len(b_hits) > 50:
parts.append(f" ... ({len(b_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check C (cases.practice_area outside closed set): {len(c_hits)} hit(s)"
)
for row in c_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"practice_area={row.get('practice_area')!r}"
)
if len(c_hits) > 50:
parts.append(f" ... ({len(c_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check D (decided cases missing chair_name): {len(d_hits)} hit(s)"
)
for row in d_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"status={row.get('status')!r}"
)
if len(d_hits) > 50:
parts.append(f" ... ({len(d_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check E (signed-final cases missing from internal_committee "
f"precedent corpus): {len(e_hits)} hit(s)"
)
for row in e_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"status={row.get('status')!r}"
)
if len(e_hits) > 50:
parts.append(f" ... ({len(e_hits) - 50} more truncated)")
parts.append("")
return "\n".join(parts)
async def main(args: argparse.Namespace) -> int:
pg_url = _pg_url()
conn = await asyncpg.connect(pg_url)
try:
a_hits = await _run_check(conn, CHECK_A_SQL)
b_hits = await _run_check(conn, CHECK_B_SQL)
c_hits = await _run_check(conn, CHECK_C_SQL)
d_hits = await _run_check(conn, CHECK_D_SQL)
e_hits = await _run_check(conn, CHECK_E_SQL)
finally:
await conn.close()
total = len(a_hits) + len(b_hits) + len(c_hits) + len(d_hits) + len(e_hits)
ts = datetime.now(timezone.utc)
report = _format_report(a_hits, b_hits, c_hits, d_hits, e_hits, ts)
# Always write to log (creates dir + file if missing).
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(report)
f.write("\n")
# Echo to stdout so cron mail / manual run shows the result.
print(report)
if total == 0:
logger.info("clean: no integrity violations found")
return 0
logger.warning(
"found %d total violation(s) (A=%d, B=%d, C=%d, D=%d, E=%d)",
total, len(a_hits), len(b_hits), len(c_hits), len(d_hits), len(e_hits),
)
if args.notify:
summary_lines = [
"ה-audit היומי על הקורפוס מצא הפרות:",
f"- Check A (external_upload עם prefix פנימי): {len(a_hits)}",
f"- Check B (internal_committee חסר chair/district): {len(b_hits)}",
f"- Check C (cases.practice_area לא תקין): {len(c_hits)}",
f"- Check D (תיקים מוכרעים ללא chair_name): {len(d_hits)}",
f"- Check E (סופיים חסרים מקורפוס-הפסיקה הפנימי): {len(e_hits)}",
"",
f"פירוט מלא: {LOG_PATH}",
]
await _notify_ceo("\n".join(summary_lines))
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--no-notify",
dest="notify",
action="store_false",
help="Don't post a CEO wakeup even if hits are found",
)
parser.set_defaults(notify=True)
args = parser.parse_args()
try:
rc = asyncio.run(main(args))
except KeyboardInterrupt:
sys.exit(130)
sys.exit(rc)